diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2023-05-11 14:29:56 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2023-05-11 14:29:56 +0300 |
commit | e9a6dc7ae59dec89bdbb1d60bcbdc76bc3d583c8 (patch) | |
tree | 589c27a25b5d1be526469bc9fbea0070a5740d40 | |
parent | 717e3b3cfdb167e8b930323397dc6e852ef94f17 (diff) | |
parent | 1c6792d19e48381a12cc3b34598949b4159843a3 (diff) | |
download | mariadb-git-bb-10.9-MDEV-29911.tar.gz |
Mergebb-10.9-MDEV-29911
-rw-r--r-- | extra/mariabackup/xtrabackup.cc | 4 | ||||
-rw-r--r-- | storage/innobase/buf/buf0dblwr.cc | 2 | ||||
-rw-r--r-- | storage/innobase/buf/buf0flu.cc | 1 | ||||
-rw-r--r-- | storage/innobase/buf/buf0lru.cc | 9 | ||||
-rw-r--r-- | storage/innobase/buf/buf0rea.cc | 83 | ||||
-rw-r--r-- | storage/innobase/fil/fil0fil.cc | 68 | ||||
-rw-r--r-- | storage/innobase/include/buf0buf.h | 3 | ||||
-rw-r--r-- | storage/innobase/include/buf0rea.h | 11 | ||||
-rw-r--r-- | storage/innobase/include/log0recv.h | 211 | ||||
-rw-r--r-- | storage/innobase/include/os0file.h | 9 | ||||
-rw-r--r-- | storage/innobase/log/log0recv.cc | 1754 | ||||
-rw-r--r-- | storage/innobase/os/os0file.cc | 90 |
12 files changed, 1325 insertions, 920 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc index 6f42a9be05a..180616f37e9 100644 --- a/extra/mariabackup/xtrabackup.cc +++ b/extra/mariabackup/xtrabackup.cc @@ -3146,7 +3146,7 @@ static bool xtrabackup_copy_logfile() if (log_sys.buf[recv_sys.offset] <= 1) break; - if (recv_sys.parse_mtr(STORE_NO) == recv_sys_t::OK) + if (recv_sys.parse_mtr<false>(false) == recv_sys_t::OK) { do { @@ -3156,7 +3156,7 @@ static bool xtrabackup_copy_logfile() sequence_offset)); *seq= 1; } - while ((r= recv_sys.parse_mtr(STORE_NO)) == recv_sys_t::OK); + while ((r= recv_sys.parse_mtr<false>(false)) == recv_sys_t::OK); if (ds_write(dst_log_file, log_sys.buf + start_offset, recv_sys.offset - start_offset)) diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index 1260145ed1c..510ad02256d 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -372,7 +372,7 @@ void buf_dblwr_t::recover() const uint32_t space_id= page_get_space_id(page); const page_id_t page_id(space_id, page_no); - if (recv_sys.lsn < lsn) + if (recv_sys.scanned_lsn < lsn) { ib::info() << "Ignoring a doublewrite copy of page " << page_id << " with future log sequence number " << lsn; diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index acfef5daad2..d7622bcd0c3 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -2583,6 +2583,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init() /** Flush the buffer pool on shutdown. */ ATTRIBUTE_COLD void buf_flush_buffer_pool() { + ut_ad(!os_aio_pending_reads()); ut_ad(!buf_page_cleaner_is_active); ut_ad(!buf_flush_sync_lsn); diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index 724aa641f12..8a25e9c5266 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -1093,7 +1093,11 @@ static bool buf_LRU_block_remove_hashed(buf_page_t *bpage, const page_id_t id, ut_a(!zip || !bpage->oldest_modification()); ut_ad(bpage->zip_size()); - + /* Skip consistency checks if the page was freed. + In recovery, we could get a sole FREE_PAGE record + and nothing else, for a ROW_FORMAT=COMPRESSED page. + Its contents would be garbage. */ + if (!bpage->is_freed()) switch (fil_page_get_type(page)) { case FIL_PAGE_TYPE_ALLOCATED: case FIL_PAGE_INODE: @@ -1224,6 +1228,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state) buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold()); page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain); + recv_sys.free_corrupted_page(id); mysql_mutex_lock(&mutex); hash_lock.lock(); @@ -1248,8 +1253,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state) buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage)); mysql_mutex_unlock(&mutex); - - recv_sys.free_corrupted_page(id); } /** Update buf_pool.LRU_old_ratio. diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc index 3d7b1f29fac..f2eac1b0f95 100644 --- a/storage/innobase/buf/buf0rea.cc +++ b/storage/innobase/buf/buf0rea.cc @@ -658,60 +658,35 @@ failed: return count; } -/** @return whether a page has been freed */ -inline bool fil_space_t::is_freed(uint32_t page) +/** Schedule a page for recovery. +@param space tablespace +@param page_id page identifier +@param recs log records +@param init page initialization, or nullptr if the page needs to be read */ +void buf_read_recover(fil_space_t *space, const page_id_t page_id, + page_recv_t &recs, recv_init *init) { - std::lock_guard<std::mutex> freed_lock(freed_range_mutex); - return freed_ranges.contains(page); -} - -/** Issues read requests for pages which recovery wants to read in. -@param space_id tablespace identifier -@param page_nos page numbers to read, in ascending order */ -void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos) -{ - fil_space_t* space = fil_space_t::get(space_id); - - if (!space) { - /* The tablespace is missing or unreadable: do nothing */ - return; - } - - const ulint zip_size = space->zip_size(); - - for (ulint i = 0; i < page_nos.size(); i++) { - - /* Ignore if the page already present in freed ranges. */ - if (space->is_freed(page_nos[i])) { - continue; - } - - const page_id_t cur_page_id(space_id, page_nos[i]); - - ulint limit = 0; - for (ulint j = 0; j < buf_pool.n_chunks; j++) { - limit += buf_pool.chunks[j].size / 2; - } + ut_ad(space->id == page_id.space()); + space->reacquire(); + const ulint zip_size= space->zip_size(); - if (os_aio_pending_reads() >= limit) { - os_aio_wait_until_no_pending_reads(false); - } - - space->reacquire(); - switch (buf_read_page_low(space, false, BUF_READ_ANY_PAGE, - cur_page_id, zip_size, true)) { - case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC: - break; - default: - sql_print_error("InnoDB: Recovery failed to read page " - UINT32PF " from %s", - cur_page_id.page_no(), - space->chain.start->name); - } - } - - - DBUG_PRINT("ib_buf", ("recovery read (%zu pages) for %s", - page_nos.size(), space->chain.start->name)); - space->release(); + if (init) + { + if (buf_page_t *bpage= buf_page_init_for_read(BUF_READ_ANY_PAGE, page_id, + zip_size, true)) + { + ut_ad(bpage->in_file()); + os_fake_read(IORequest{bpage, (buf_tmp_buffer_t*) &recs, + UT_LIST_GET_FIRST(space->chain), + IORequest::READ_ASYNC}, ptrdiff_t(init)); + } + } + else if (dberr_t err= buf_read_page_low(space, false, BUF_READ_ANY_PAGE, + page_id, zip_size, true)) + { + if (err != DB_SUCCESS_LOCKED_REC) + sql_print_error("InnoDB: Recovery failed to read page " + UINT32PF " from %s", + page_id.page_no(), space->chain.start->name); + } } diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc index be313140225..e4b352a05aa 100644 --- a/storage/innobase/fil/fil0fil.cc +++ b/storage/innobase/fil/fil0fil.cc @@ -2775,53 +2775,55 @@ func_exit: #include <tpool.h> -/** Callback for AIO completion */ -void fil_aio_callback(const IORequest &request) +void IORequest::write_complete() const { ut_ad(fil_validate_skip()); - ut_ad(request.node); + ut_ad(node); + ut_ad(is_write()); - if (!request.bpage) + if (!bpage) { ut_ad(!srv_read_only_mode); - if (request.type == IORequest::DBLWR_BATCH) - buf_dblwr.flush_buffered_writes_completed(request); + if (type == IORequest::DBLWR_BATCH) + buf_dblwr.flush_buffered_writes_completed(*this); else - ut_ad(request.type == IORequest::WRITE_ASYNC); -write_completed: - request.node->complete_write(); - } - else if (request.is_write()) - { - buf_page_write_complete(request); - goto write_completed; + ut_ad(type == IORequest::WRITE_ASYNC); } else - { - ut_ad(request.is_read()); + buf_page_write_complete(*this); - /* IMPORTANT: since i/o handling for reads will read also the insert - buffer in fil_system.sys_space, we have to be very careful not to - introduce deadlocks. We never close fil_system.sys_space data - files and never issue asynchronous reads of change buffer pages. */ - const page_id_t id(request.bpage->id()); + node->complete_write(); + node->space->release(); +} - if (dberr_t err= request.bpage->read_complete(*request.node)) - { - if (recv_recovery_is_on() && !srv_force_recovery) - { - mysql_mutex_lock(&recv_sys.mutex); - recv_sys.set_corrupt_fs(); - mysql_mutex_unlock(&recv_sys.mutex); - } +void IORequest::read_complete() const +{ + ut_ad(fil_validate_skip()); + ut_ad(node); + ut_ad(is_read()); + ut_ad(bpage); + + /* IMPORTANT: since i/o handling for reads will read also the insert + buffer in fil_system.sys_space, we have to be very careful not to + introduce deadlocks. We never close fil_system.sys_space data files + and never issue asynchronous reads of change buffer pages. */ + const page_id_t id(bpage->id()); - if (err != DB_FAIL) - ib::error() << "Failed to read page " << id.page_no() - << " from file '" << request.node->name << "': " << err; + if (dberr_t err= bpage->read_complete(*node)) + { + if (recv_recovery_is_on() && !srv_force_recovery) + { + mysql_mutex_lock(&recv_sys.mutex); + recv_sys.set_corrupt_fs(); + mysql_mutex_unlock(&recv_sys.mutex); } + + if (err != DB_FAIL) + ib::error() << "Failed to read page " << id.page_no() + << " from file '" << node->name << "': " << err; } - request.node->space->release(); + node->space->release(); } /** Flush to disk the writes in file spaces of the given type diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index 4295c3ba342..6d3ec65b1d3 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -75,8 +75,7 @@ struct buf_pool_info_t ulint flush_list_len; /*!< Length of buf_pool.flush_list */ ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages pending decompress */ - ulint n_pend_reads; /*!< buf_pool.n_pend_reads, pages - pending read */ + ulint n_pend_reads; /*!< os_aio_pending_reads() */ ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */ ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH LIST */ diff --git a/storage/innobase/include/buf0rea.h b/storage/innobase/include/buf0rea.h index 4ec8938c689..3dd085dda5c 100644 --- a/storage/innobase/include/buf0rea.h +++ b/storage/innobase/include/buf0rea.h @@ -102,10 +102,13 @@ which could result in a deadlock if the OS does not support asynchronous io. ulint buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf); -/** Issue read requests for pages that need to be recovered. -@param space_id tablespace identifier -@param page_nos page numbers to read, in ascending order */ -void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos); +/** Schedule a page for recovery. +@param space tablespace +@param page_id page identifier +@param recs log records +@param init page initialization, or nullptr if the page needs to be read */ +void buf_read_recover(fil_space_t *space, const page_id_t page_id, + page_recv_t &recs, recv_init *init); /** @name Modes used in read-ahead @{ */ /** read only pages belonging to the insert buffer tree */ diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h index e787d81e8c2..e642b501409 100644 --- a/storage/innobase/include/log0recv.h +++ b/storage/innobase/include/log0recv.h @@ -38,9 +38,9 @@ Created 9/20/1997 Heikki Tuuri #define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on) ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result)) -/** Apply any buffered redo log to a page that was just read from a data file. -@param[in,out] space tablespace -@param[in,out] bpage buffer pool page +/** Apply any buffered redo log to a page. +@param space tablespace +@param bpage buffer pool page @return whether the page was recovered correctly */ bool recv_recover_page(fil_space_t* space, buf_page_t* bpage); @@ -49,17 +49,6 @@ of first system tablespace page @return error code or DB_SUCCESS */ dberr_t recv_recovery_from_checkpoint_start(); -/** Whether to store redo log records in recv_sys.pages */ -enum store_t { - /** Do not store redo log records. */ - STORE_NO, - /** Store redo log records. */ - STORE_YES, - /** Store redo log records if the tablespace exists. */ - STORE_IF_EXISTS -}; - - /** Report an operation to create, delete, or rename a file during backup. @param[in] space_id tablespace identifier @param[in] type file operation redo log type @@ -125,21 +114,15 @@ struct recv_dblwr_t list pages; }; -/** the recovery state and buffered records for a page */ +/** recv_sys.pages entry; protected by recv_sys.mutex */ struct page_recv_t { - /** Recovery state; protected by recv_sys.mutex */ - enum - { - /** not yet processed */ - RECV_NOT_PROCESSED, - /** not processed; the page will be reinitialized */ - RECV_WILL_NOT_READ, - /** page is being read */ - RECV_BEING_READ, - /** log records are being applied on the page */ - RECV_BEING_PROCESSED - } state= RECV_NOT_PROCESSED; + /** Recovery status: 0=not in progress, 1=log is being applied, + -1=log has been applied and the entry may be erased. + Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */ + Atomic_relaxed<int8_t> being_processed{0}; + /** Whether reading the page will be skipped */ + bool skip_read= false; /** Latest written byte offset when applying the log records. @see mtr_t::m_last_offset */ uint16_t last_offset= 1; @@ -162,6 +145,9 @@ struct page_recv_t head= recs; tail= recs; } + /** Remove the last records for the page + @param start_lsn start of the removed log */ + ATTRIBUTE_COLD void rewind(lsn_t start_lsn); /** @return the last log snippet */ const log_rec_t* last() const { return tail; } @@ -180,8 +166,8 @@ struct page_recv_t iterator begin() { return head; } iterator end() { return NULL; } bool empty() const { ut_ad(!head == !tail); return !head; } - /** Clear and free the records; @see recv_sys_t::alloc() */ - inline void clear(); + /** Clear and free the records; @see recv_sys_t::add() */ + void clear(); } log; /** Trim old log records for a page. @@ -190,21 +176,27 @@ struct page_recv_t inline bool trim(lsn_t start_lsn); /** Ignore any earlier redo log records for this page. */ inline void will_not_read(); - /** @return whether the log records for the page are being processed */ - bool is_being_processed() const { return state == RECV_BEING_PROCESSED; } +}; + +/** A page initialization operation that was parsed from the redo log */ +struct recv_init +{ + /** log sequence number of the page initialization */ + lsn_t lsn; + /** Whether btr_page_create() avoided a read of the page. + At the end of the last recovery batch, mark_ibuf_exist() + will mark pages for which this flag is set. */ + bool created; }; /** Recovery system data structure */ struct recv_sys_t { - /** mutex protecting apply_log_recs and page_recv_t::state */ - mysql_mutex_t mutex; + using init= recv_init; + + /** mutex protecting this as well as some of page_recv_t */ + alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex; private: - /** condition variable for - !apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */ - pthread_cond_t cond; - /** whether recv_apply_hashed_log_recs() is running */ - bool apply_batch_on; /** set when finding a corrupt log block or record, or there is a log parsing buffer overflow */ bool found_corrupt_log; @@ -226,6 +218,8 @@ public: size_t offset; /** log sequence number of the first non-parsed record */ lsn_t lsn; + /** log sequence number of the last parsed mini-transaction */ + lsn_t scanned_lsn; /** log sequence number at the end of the FILE_CHECKPOINT record, or 0 */ lsn_t file_checkpoint; /** the time when progress was last reported */ @@ -238,6 +232,9 @@ public: map pages; private: + /** iterator to pages, used by parse() */ + map::iterator pages_it; + /** Process a record that indicates that a tablespace size is being shrunk. @param page_id first page that is not in the file @param lsn log sequence number of the shrink operation */ @@ -257,30 +254,42 @@ public: /** The contents of the doublewrite buffer */ recv_dblwr_t dblwr; - /** Last added LSN to pages, before switching to STORE_NO */ - lsn_t last_stored_lsn= 0; - inline void read(os_offset_t offset, span<byte> buf); inline size_t files_size(); void close_files() { files.clear(); files.shrink_to_fit(); } + /** Advance pages_it if it matches the iterator */ + void pages_it_invalidate(const map::iterator &p) + { + mysql_mutex_assert_owner(&mutex); + if (pages_it == p) + pages_it++; + } + /** Invalidate pages_it if it points to the given tablespace */ + void pages_it_invalidate(uint32_t space_id) + { + mysql_mutex_assert_owner(&mutex); + if (pages_it != pages.end() && pages_it->first.space() == space_id) + pages_it= pages.end(); + } + private: /** Attempt to initialize a page based on redo log records. - @param page_id page identifier - @param p iterator pointing to page_id + @param p iterator @param mtr mini-transaction @param b pre-allocated buffer pool block + @param init page initialization @return the recovered block @retval nullptr if the page cannot be initialized based on log records @retval -1 if the page cannot be recovered due to corruption */ - inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p, - mtr_t &mtr, buf_block_t *b); + inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr, + buf_block_t *b, init &init); /** Attempt to initialize a page based on redo log records. @param page_id page identifier @return the recovered block @retval nullptr if the page cannot be initialized based on log records @retval -1 if the page cannot be recovered due to corruption */ - buf_block_t *recover_low(const page_id_t page_id); + ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id); /** All found log files (multiple ones are possible if we are upgrading from before MariaDB Server 10.5.1) */ @@ -289,10 +298,27 @@ private: /** Base node of the redo block list. List elements are linked via buf_block_t::unzip_LRU. */ UT_LIST_BASE_NODE_T(buf_block_t) blocks; + + /** Allocate a block from the buffer pool for recv_sys.pages */ + ATTRIBUTE_COLD buf_block_t *add_block(); + + /** Wait for buffer pool to become available. + @param pages number of buffer pool pages needed */ + ATTRIBUTE_COLD void wait_for_pool(size_t pages); + + /** Free log for processed pages. */ + void garbage_collect(); + + /** Apply a recovery batch. + @param space_id current tablespace identifier + @param space current tablespace + @param free_block spare buffer block + @param last_batch whether it is possible to write more redo log + @return whether the caller must provide a new free_block */ + bool apply_batch(uint32_t space_id, fil_space_t *&space, + buf_block_t *&free_block, bool last_batch); + public: - /** Check whether the number of read redo log blocks exceeds the maximum. - @return whether the memory is exhausted */ - inline bool is_memory_exhausted(); /** Apply buffered log to persistent data pages. @param last_batch whether it is possible to write more redo log */ void apply(bool last_batch); @@ -310,7 +336,7 @@ public: /** Clean up after create() */ void close(); - bool is_initialised() const { return last_stored_lsn != 0; } + bool is_initialised() const { return scanned_lsn != 0; } /** Find the latest checkpoint. @return error code or DB_SUCCESS */ @@ -321,60 +347,76 @@ public: @param start_lsn start LSN of the mini-transaction @param lsn @see mtr_t::commit_lsn() @param l redo log snippet - @param len length of l, in bytes */ - inline void add(map::iterator it, lsn_t start_lsn, lsn_t lsn, - const byte *l, size_t len); - - enum parse_mtr_result { OK, PREMATURE_EOF, GOT_EOF }; + @param len length of l, in bytes + @return whether we ran out of memory */ + bool add(map::iterator it, lsn_t start_lsn, lsn_t lsn, + const byte *l, size_t len); + + /** Parsing result */ + enum parse_mtr_result { + /** a record was successfully parsed */ + OK, + /** the log ended prematurely (need to read more) */ + PREMATURE_EOF, + /** the end of the log was reached */ + GOT_EOF, + /** parse<true>(l, false) ran out of memory */ + GOT_OOM + }; private: /** Parse and register one log_t::FORMAT_10_8 mini-transaction. - @param store whether to store the records - @param l log data source */ + @tparam store whether to store the records + @param l log data source + @param if_exists if store: whether to check if the tablespace exists */ + template<typename source,bool store> + inline parse_mtr_result parse(source &l, bool if_exists) noexcept; + + /** Rewind a mini-transaction when parse() runs out of memory. + @param l log data source + @param begin start of the mini-transaction */ template<typename source> - inline parse_mtr_result parse(store_t store, source& l) noexcept; + ATTRIBUTE_COLD void rewind(source &l, source &begin) noexcept; + + /** Report progress in terms of LSN or pages remaining */ + ATTRIBUTE_COLD void report_progress() const; public: /** Parse and register one log_t::FORMAT_10_8 mini-transaction, handling log_sys.is_pmem() buffer wrap-around. - @param store whether to store the records */ - static parse_mtr_result parse_mtr(store_t store) noexcept; + @tparam store whether to store the records + @param if_exists if store: whether to check if the tablespace exists */ + template<bool store> + static parse_mtr_result parse_mtr(bool if_exists) noexcept; /** Parse and register one log_t::FORMAT_10_8 mini-transaction, handling log_sys.is_pmem() buffer wrap-around. - @param store whether to store the records */ - static parse_mtr_result parse_pmem(store_t store) noexcept + @tparam store whether to store the records + @param if_exists if store: whether to check if the tablespace exists */ + template<bool store> + static parse_mtr_result parse_pmem(bool if_exists) noexcept #ifdef HAVE_PMEM ; #else - { return parse_mtr(store); } + { return parse_mtr<store>(if_exists); } #endif + /** Erase log records for a page. */ + void erase(map::iterator p); + /** Clear a fully processed set of stored redo log records. */ - inline void clear(); + void clear(); /** Determine whether redo log recovery progress should be reported. @param time the current time @return whether progress should be reported (the last report was at least 15 seconds ago) */ - bool report(time_t time) - { - if (time - progress_time < 15) - return false; - - progress_time= time; - return true; - } + bool report(time_t time); /** The alloc() memory alignment, in bytes */ static constexpr size_t ALIGNMENT= sizeof(size_t); - /** Allocate memory for log_rec_t - @param len allocation size, in bytes - @return pointer to len bytes of memory (never NULL) */ - inline void *alloc(size_t len); - /** Free a redo log snippet. - @param data buffer returned by alloc() */ + @param data buffer allocated in add() */ inline void free(const void *data); /** Remove records for a corrupted page. @@ -386,8 +428,6 @@ public: ATTRIBUTE_COLD void set_corrupt_fs(); /** Flag log file corruption during recovery. */ ATTRIBUTE_COLD void set_corrupt_log(); - /** Possibly finish a recovery batch. */ - inline void maybe_finish_batch(); /** @return whether data file corruption was found */ bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); } @@ -405,13 +445,14 @@ public: } /** Try to recover a tablespace that was not readable earlier - @param p iterator, initially pointing to page_id_t{space_id,0}; - the records will be freed and the iterator advanced + @param p iterator @param name tablespace file name @param free_block spare buffer block - @return whether recovery failed */ - bool recover_deferred(map::iterator &p, const std::string &name, - buf_block_t *&free_block); + @return recovered tablespace + @retval nullptr if recovery failed */ + fil_space_t *recover_deferred(const map::iterator &p, + const std::string &name, + buf_block_t *&free_block); }; /** The recovery system */ diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h index 13f9d3de3f8..54f7ceeb4c0 100644 --- a/storage/innobase/include/os0file.h +++ b/storage/innobase/include/os0file.h @@ -212,6 +212,10 @@ public: bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; } bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; } + void write_complete() const; + void read_complete() const; + void fake_read_complete(os_offset_t offset) const; + /** If requested, free storage space associated with a section of the file. @param off byte offset from the start (SEEK_SET) @param len size of the hole in bytes @@ -1040,6 +1044,11 @@ int os_aio_init(); Frees the asynchronous io system. */ void os_aio_free(); +/** Submit a fake read request during crash recovery. +@param type fake read request +@param offset additional context */ +void os_fake_read(const IORequest &type, os_offset_t offset); + /** Request a read or write. @param type I/O request @param buf buffer diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index 37a496725fc..4619786ee8d 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -738,7 +738,7 @@ static struct { retry: log_sys.latch.wr_unlock(); - bool fail= false; + fil_space_t *space= fil_system.sys_space; buf_block_t *free_block= buf_LRU_get_free_block(false); log_sys.latch.wr_lock(SRW_LOCK_CALL); mysql_mutex_lock(&recv_sys.mutex); @@ -755,11 +755,12 @@ retry: there were no buffered records. Either way, we must create a dummy tablespace with the latest known name, for dict_drop_index_tree(). */ + recv_sys.pages_it_invalidate(space_id); while (p != recv_sys.pages.end() && p->first.space() == space_id) { + ut_ad(!p->second.being_processed); recv_sys_t::map::iterator r= p++; - r->second.log.clear(); - recv_sys.pages.erase(r); + recv_sys.erase(r); } recv_spaces_t::iterator it{recv_spaces.find(space_id)}; if (it != recv_spaces.end()) @@ -782,11 +783,14 @@ retry: } } else - fail= recv_sys.recover_deferred(p, d->second.file_name, free_block); + space= recv_sys.recover_deferred(p, d->second.file_name, free_block); processed: - defers.erase(d++); - if (fail) + auto e= d++; + defers.erase(e); + if (!space) break; + if (space != fil_system.sys_space) + space->release(); if (free_block) continue; mysql_mutex_unlock(&recv_sys.mutex); @@ -797,7 +801,7 @@ processed: mysql_mutex_unlock(&recv_sys.mutex); if (free_block) buf_pool.free_block(free_block); - return fail; + return !space; } /** Create tablespace metadata for a data file that was initially @@ -905,28 +909,191 @@ free_space: } deferred_spaces; +/** Report an operation to create, delete, or rename a file during backup. +@param[in] space_id tablespace identifier +@param[in] type redo log type +@param[in] name file name (not NUL-terminated) +@param[in] len length of name, in bytes +@param[in] new_name new file name (NULL if not rename) +@param[in] new_len length of new_name, in bytes (0 if NULL) */ +void (*log_file_op)(uint32_t space_id, int type, + const byte* name, ulint len, + const byte* new_name, ulint new_len); + +void (*undo_space_trunc)(uint32_t space_id); + +void (*first_page_init)(uint32_t space_id); + +/** Information about initializing page contents during redo log processing. +FIXME: Rely on recv_sys.pages! */ +class mlog_init_t +{ + using map= std::map<const page_id_t, recv_init, + std::less<const page_id_t>, + ut_allocator<std::pair<const page_id_t, recv_init>>>; + /** Map of page initialization operations. + FIXME: Merge this to recv_sys.pages! */ + map inits; + + /** Iterator to the last add() or will_avoid_read(), for speeding up + will_avoid_read(). */ + map::iterator i; +public: + /** Constructor */ + mlog_init_t() : i(inits.end()) {} + + /** Record that a page will be initialized by the redo log. + @param page_id page identifier + @param lsn log sequence number + @return whether the state was changed */ + bool add(const page_id_t page_id, lsn_t lsn) + { + mysql_mutex_assert_owner(&recv_sys.mutex); + const recv_init init = { lsn, false }; + std::pair<map::iterator, bool> p= + inits.insert(map::value_type(page_id, init)); + ut_ad(!p.first->second.created); + if (p.second) return true; + if (p.first->second.lsn >= lsn) return false; + p.first->second = init; + i = p.first; + return true; + } + + /** Get the last stored lsn of the page id and its respective + init/load operation. + @param page_id page identifier + @return the latest page initialization; + not valid after releasing recv_sys.mutex. */ + recv_init &last(page_id_t page_id) + { + mysql_mutex_assert_owner(&recv_sys.mutex); + return inits.find(page_id)->second; + } + + /** Determine if a page will be initialized or freed after a time. + @param page_id page identifier + @param lsn log sequence number + @return whether page_id will be freed or initialized after lsn */ + bool will_avoid_read(page_id_t page_id, lsn_t lsn) + { + mysql_mutex_assert_owner(&recv_sys.mutex); + if (i != inits.end() && i->first == page_id) + return i->second.lsn > lsn; + i = inits.lower_bound(page_id); + return i != inits.end() && i->first == page_id && i->second.lsn > lsn; + } + + /** At the end of each recovery batch, reset the 'created' flags. */ + void reset() + { + mysql_mutex_assert_owner(&recv_sys.mutex); + ut_ad(recv_no_ibuf_operations); + for (map::value_type &i : inits) + i.second.created= false; + } + + /** During the last recovery batch, mark whether there exist + buffered changes for the pages that were initialized + by buf_page_create() and still reside in the buffer pool. */ + void mark_ibuf_exist() + { + mysql_mutex_assert_owner(&recv_sys.mutex); + + for (const map::value_type &i : inits) + if (i.second.created) + { + auto &chain= buf_pool.page_hash.cell_get(i.first.fold()); + page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain); + + hash_lock.lock_shared(); + buf_block_t *block= reinterpret_cast<buf_block_t*> + (buf_pool.page_hash.get(i.first, chain)); + bool got_latch= block && block->page.lock.x_lock_try(); + hash_lock.unlock_shared(); + + if (!block) + continue; + + uint32_t state; + + if (!got_latch) + { + mysql_mutex_lock(&buf_pool.mutex); + block= reinterpret_cast<buf_block_t*> + (buf_pool.page_hash.get(i.first, chain)); + if (!block) + { + mysql_mutex_unlock(&buf_pool.mutex); + continue; + } + + state= block->page.fix(); + mysql_mutex_unlock(&buf_pool.mutex); + if (state < buf_page_t::UNFIXED) + { + block->page.unfix(); + continue; + } + block->page.lock.x_lock(); + state= block->page.unfix(); + ut_ad(state < buf_page_t::READ_FIX); + if (state >= buf_page_t::UNFIXED && block->page.id() == i.first) + goto check_ibuf; + } + else + { + state= block->page.state(); + ut_ad(state >= buf_page_t::FREED); + ut_ad(state < buf_page_t::READ_FIX); + + if (state >= buf_page_t::UNFIXED) + { + check_ibuf: + mysql_mutex_unlock(&recv_sys.mutex); + if (ibuf_page_exists(block->page.id(), block->zip_size())) + block->page.set_ibuf_exist(); + mysql_mutex_lock(&recv_sys.mutex); + } + } + + block->page.lock.x_unlock(); + } + } + + /** Clear the data structure */ + void clear() { inits.clear(); i = inits.end(); } +}; + +static mlog_init_t mlog_init; + /** Try to recover a tablespace that was not readable earlier -@param p iterator, initially pointing to page_id_t{space_id,0}; - the records will be freed and the iterator advanced +@param p iterator to the page @param name tablespace file name @param free_block spare buffer block -@return whether recovery failed */ -bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, - const std::string &name, - buf_block_t *&free_block) +@return recovered tablespace +@retval nullptr if recovery failed */ +fil_space_t *recv_sys_t::recover_deferred(const recv_sys_t::map::iterator &p, + const std::string &name, + buf_block_t *&free_block) { mysql_mutex_assert_owner(&mutex); - const page_id_t first{p->first}; - ut_ad(first.space()); + ut_ad(p->first.space()); - recv_spaces_t::iterator it{recv_spaces.find(first.space())}; + recv_spaces_t::iterator it{recv_spaces.find(p->first.space())}; ut_ad(it != recv_spaces.end()); - if (!first.page_no() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) + if (!p->first.page_no() && p->second.skip_read) { mtr_t mtr; - buf_block_t *block= recover_low(first, p, mtr, free_block); + ut_ad(!p->second.being_processed); + p->second.being_processed= 1; + init &init= mlog_init.last(p->first); + mysql_mutex_unlock(&mutex); + buf_block_t *block= recover_low(p, mtr, free_block, init); + mysql_mutex_lock(&mutex); + p->second.being_processed= -1; ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1)); free_block= nullptr; if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1))) @@ -939,10 +1106,7 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET); const uint32_t size= fsp_header_get_field(page, FSP_SIZE); - ut_ad(it != recv_spaces.end()); - - if (page_id_t{space_id, page_no} == first && size >= 4 && - it != recv_spaces.end() && + if (page_id_t{space_id, page_no} == p->first && size >= 4 && fil_space_t::is_valid_flags(flags, space_id) && fil_space_t::logical_size(flags) == srv_page_size) { @@ -996,10 +1160,10 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, } size_set: node->deferred= false; - space->release(); it->second.space= space; block->page.lock.x_unlock(); - return false; + p->second.being_processed= -1; + return space; } release_and_fail: @@ -1007,179 +1171,34 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, } fail: - ib::error() << "Cannot apply log to " << first + ib::error() << "Cannot apply log to " << p->first << " of corrupted file '" << name << "'"; - return true; + return nullptr; } -/** Report an operation to create, delete, or rename a file during backup. -@param[in] space_id tablespace identifier -@param[in] type redo log type -@param[in] name file name (not NUL-terminated) -@param[in] len length of name, in bytes -@param[in] new_name new file name (NULL if not rename) -@param[in] new_len length of new_name, in bytes (0 if NULL) */ -void (*log_file_op)(uint32_t space_id, int type, - const byte* name, ulint len, - const byte* new_name, ulint new_len); - -void (*undo_space_trunc)(uint32_t space_id); - -void (*first_page_init)(uint32_t space_id); - -/** Information about initializing page contents during redo log processing. -FIXME: Rely on recv_sys.pages! */ -class mlog_init_t -{ -public: - /** A page initialization operation that was parsed from - the redo log */ - struct init { - /** log sequence number of the page initialization */ - lsn_t lsn; - /** Whether btr_page_create() avoided a read of the page. - - At the end of the last recovery batch, mark_ibuf_exist() - will mark pages for which this flag is set. */ - bool created; - }; - -private: - typedef std::map<const page_id_t, init, - std::less<const page_id_t>, - ut_allocator<std::pair<const page_id_t, init> > > - map; - /** Map of page initialization operations. - FIXME: Merge this to recv_sys.pages! */ - map inits; -public: - /** Record that a page will be initialized by the redo log. - @param[in] page_id page identifier - @param[in] lsn log sequence number - @return whether the state was changed */ - bool add(const page_id_t page_id, lsn_t lsn) - { - mysql_mutex_assert_owner(&recv_sys.mutex); - const init init = { lsn, false }; - std::pair<map::iterator, bool> p = inits.insert( - map::value_type(page_id, init)); - ut_ad(!p.first->second.created); - if (p.second) return true; - if (p.first->second.lsn >= init.lsn) return false; - p.first->second = init; - return true; - } - - /** Get the last stored lsn of the page id and its respective - init/load operation. - @param[in] page_id page id - @param[in,out] init initialize log or load log - @return the latest page initialization; - not valid after releasing recv_sys.mutex. */ - init& last(page_id_t page_id) - { - mysql_mutex_assert_owner(&recv_sys.mutex); - return inits.find(page_id)->second; - } - - /** Determine if a page will be initialized or freed after a time. - @param page_id page identifier - @param lsn log sequence number - @return whether page_id will be freed or initialized after lsn */ - bool will_avoid_read(page_id_t page_id, lsn_t lsn) const - { - mysql_mutex_assert_owner(&recv_sys.mutex); - auto i= inits.find(page_id); - return i != inits.end() && i->second.lsn > lsn; - } - - /** At the end of each recovery batch, reset the 'created' flags. */ - void reset() - { - mysql_mutex_assert_owner(&recv_sys.mutex); - ut_ad(recv_no_ibuf_operations); - for (map::value_type& i : inits) { - i.second.created = false; - } - } - - /** On the last recovery batch, mark whether there exist - buffered changes for the pages that were initialized - by buf_page_create() and still reside in the buffer pool. - @param[in,out] mtr dummy mini-transaction */ - void mark_ibuf_exist(mtr_t& mtr) - { - mysql_mutex_assert_owner(&recv_sys.mutex); - mtr.start(); - - for (const map::value_type& i : inits) { - if (!i.second.created) { - continue; - } - if (buf_block_t* block = buf_page_get_low( - i.first, 0, RW_X_LATCH, nullptr, - BUF_GET_IF_IN_POOL, - &mtr, nullptr, false)) { - if (UNIV_LIKELY_NULL(block->page.zip.data)) { - switch (fil_page_get_type( - block->page.zip.data)) { - case FIL_PAGE_INDEX: - case FIL_PAGE_RTREE: - if (page_zip_decompress( - &block->page.zip, - block->page.frame, - true)) { - break; - } - ib::error() << "corrupted " - << block->page.id(); - } - } - if (recv_no_ibuf_operations) { - mtr.commit(); - mtr.start(); - continue; - } - mysql_mutex_unlock(&recv_sys.mutex); - if (ibuf_page_exists(block->page.id(), - block->zip_size())) { - block->page.set_ibuf_exist(); - } - mtr.commit(); - mtr.start(); - mysql_mutex_lock(&recv_sys.mutex); - } - } - - mtr.commit(); - clear(); - } - - /** Clear the data structure */ - void clear() { inits.clear(); } -}; - -static mlog_init_t mlog_init; - /** Process a record that indicates that a tablespace is being shrunk in size. @param page_id first page identifier that is not in the file @param lsn log sequence number of the shrink operation */ inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn) { - DBUG_ENTER("recv_sys_t::trim"); - DBUG_LOG("ib_log", - "discarding log beyond end of tablespace " - << page_id << " before LSN " << lsn); - mysql_mutex_assert_owner(&mutex); - for (recv_sys_t::map::iterator p = pages.lower_bound(page_id); - p != pages.end() && p->first.space() == page_id.space();) { - recv_sys_t::map::iterator r = p++; - if (r->second.trim(lsn)) { - pages.erase(r); - } - } - DBUG_VOID_RETURN; + DBUG_ENTER("recv_sys_t::trim"); + DBUG_LOG("ib_log", "discarding log beyond end of tablespace " + << page_id << " before LSN " << lsn); + mysql_mutex_assert_owner(&mutex); + if (pages_it != pages.end() && pages_it->first.space() == page_id.space()) + pages_it= pages.end(); + for (recv_sys_t::map::iterator p = pages.lower_bound(page_id); + p != pages.end() && p->first.space() == page_id.space();) + { + recv_sys_t::map::iterator r = p++; + if (r->second.trim(lsn)) + { + ut_ad(!r->second.being_processed); + pages.erase(r); + } + } + DBUG_VOID_RETURN; } inline void recv_sys_t::read(os_offset_t total_offset, span<byte> buf) @@ -1202,15 +1221,10 @@ inline size_t recv_sys_t::files_size() @param[in] space_id the tablespace ID @param[in] ftype FILE_MODIFY, FILE_DELETE, or FILE_RENAME @param[in] lsn lsn of the redo log -@param[in] store whether the redo log has to be stored */ +@param[in] if_exists whether to check if the tablespace exists */ static void fil_name_process(const char *name, ulint len, uint32_t space_id, - mfile_type_t ftype, lsn_t lsn, store_t store) + mfile_type_t ftype, lsn_t lsn, bool if_exists) { - if (srv_operation == SRV_OPERATION_BACKUP - || srv_operation == SRV_OPERATION_BACKUP_NO_DEFER) { - return; - } - ut_ad(srv_operation <= SRV_OPERATION_EXPORT_RESTORED || srv_operation == SRV_OPERATION_RESTORE || srv_operation == SRV_OPERATION_RESTORE_EXPORT); @@ -1321,7 +1335,7 @@ same_space: case FIL_LOAD_DEFER: /** Skip the deferred spaces when lsn is already processed */ - if (store != store_t::STORE_IF_EXISTS) { + if (!if_exists) { deferred_spaces.add( space_id, fname.name.c_str(), lsn); } @@ -1364,9 +1378,8 @@ void recv_sys_t::close() deferred_spaces.clear(); ut_d(mysql_mutex_unlock(&mutex)); - last_stored_lsn= 0; + scanned_lsn= 0; mysql_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); } recv_spaces.clear(); @@ -1381,34 +1394,34 @@ void recv_sys_t::create() ut_ad(this == &recv_sys); ut_ad(!is_initialised()); mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr); - pthread_cond_init(&cond, nullptr); apply_log_recs = false; - apply_batch_on = false; len = 0; offset = 0; lsn = 0; + scanned_lsn = 1; found_corrupt_log = false; found_corrupt_fs = false; file_checkpoint = 0; progress_time = time(NULL); + ut_ad(pages.empty()); + pages_it = pages.end(); recv_max_page_lsn = 0; memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces); - last_stored_lsn = 1; UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU); } /** Clear a fully processed set of stored redo log records. */ -inline void recv_sys_t::clear() +void recv_sys_t::clear() { mysql_mutex_assert_owner(&mutex); apply_log_recs= false; - apply_batch_on= false; ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks)); pages.clear(); + pages_it= pages.end(); for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; ) { @@ -1419,8 +1432,6 @@ inline void recv_sys_t::clear() buf_block_free(block); block= prev_block; } - - pthread_cond_broadcast(&cond); } /** Free most recovery data structures. */ @@ -1432,52 +1443,14 @@ void recv_sys_t::debug_free() recovery_on= false; pages.clear(); + pages_it= pages.end(); mysql_mutex_unlock(&mutex); } -inline void *recv_sys_t::alloc(size_t len) -{ - mysql_mutex_assert_owner(&mutex); - ut_ad(len); - ut_ad(len <= srv_page_size); - - buf_block_t *block= UT_LIST_GET_FIRST(blocks); - if (UNIV_UNLIKELY(!block)) - { -create_block: - block= buf_block_alloc(); - block->page.access_time= 1U << 16 | - ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT); - static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2"); - UT_LIST_ADD_FIRST(blocks, block); - MEM_MAKE_ADDRESSABLE(block->page.frame, len); - MEM_NOACCESS(block->page.frame + len, srv_page_size - len); - return my_assume_aligned<ALIGNMENT>(block->page.frame); - } - - size_t free_offset= static_cast<uint16_t>(block->page.access_time); - ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT)); - if (UNIV_UNLIKELY(!free_offset)) - { - ut_ad(srv_page_size == 65536); - goto create_block; - } - ut_ad(free_offset <= srv_page_size); - free_offset+= len; - - if (free_offset > srv_page_size) - goto create_block; - - block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 | - ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT); - MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - len, len); - return my_assume_aligned<ALIGNMENT>(block->page.frame + free_offset - len); -} - /** Free a redo log snippet. -@param data buffer returned by alloc() */ +@param data buffer allocated in add() */ inline void recv_sys_t::free(const void *data) { ut_ad(!ut_align_offset(data, ALIGNMENT)); @@ -1502,8 +1475,11 @@ inline void recv_sys_t::free(const void *data) ut_ad(block->page.state() == buf_page_t::MEMORY); ut_ad(static_cast<uint16_t>(block->page.access_time - 1) < srv_page_size); - ut_ad(block->page.access_time >= 1U << 16); - if (!((block->page.access_time -= 1U << 16) >> 16)) + unsigned a= block->page.access_time; + ut_ad(a >= 1U << 16); + a-= 1U << 16; + block->page.access_time= a; + if (!(a >> 16)) { UT_LIST_REMOVE(blocks, block); MEM_MAKE_ADDRESSABLE(block->page.frame, srv_page_size); @@ -1689,6 +1665,9 @@ dberr_t recv_sys_t::find_checkpoint() bool wrong_size= false; byte *buf; + ut_ad(pages.empty()); + pages_it= pages.end(); + if (files.empty()) { file_checkpoint= 0; @@ -1965,7 +1944,31 @@ inline bool page_recv_t::trim(lsn_t start_lsn) } -inline void page_recv_t::recs_t::clear() +void page_recv_t::recs_t::rewind(lsn_t start_lsn) +{ + mysql_mutex_assert_owner(&recv_sys.mutex); + log_phys_t *trim= static_cast<log_phys_t*>(head); + ut_ad(trim); + while (log_phys_t *next= static_cast<log_phys_t*>(trim->next)) + { + ut_ad(trim->start_lsn < start_lsn); + if (next->start_lsn == start_lsn) + break; + trim= next; + } + tail= trim; + log_rec_t *l= tail->next; + tail->next= nullptr; + while (l) + { + log_rec_t *next= l->next; + recv_sys.free(l); + l= next; + } +} + + +void page_recv_t::recs_t::clear() { mysql_mutex_assert_owner(&recv_sys.mutex); for (const log_rec_t *l= head; l; ) @@ -1977,33 +1980,99 @@ inline void page_recv_t::recs_t::clear() head= tail= nullptr; } - /** Ignore any earlier redo log records for this page. */ inline void page_recv_t::will_not_read() { - ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ); - state= RECV_WILL_NOT_READ; + ut_ad(!being_processed); + skip_read= true; log.clear(); } +void recv_sys_t::erase(map::iterator p) +{ + ut_ad(p->second.being_processed <= 0); + p->second.log.clear(); + pages.erase(p); +} + +/** Free log for processed pages. */ +void recv_sys_t::garbage_collect() +{ + mysql_mutex_assert_owner(&mutex); + + if (pages_it != pages.end() && pages_it->second.being_processed < 0) + pages_it= pages.end(); + + for (map::iterator p= pages.begin(); p != pages.end(); ) + { + if (p->second.being_processed < 0) + { + map::iterator r= p++; + erase(r); + } + else + p++; + } +} + +/** Allocate a block from the buffer pool for recv_sys.pages */ +ATTRIBUTE_COLD buf_block_t *recv_sys_t::add_block() +{ + for (bool freed= false;;) + { + const auto rs= UT_LIST_GET_LEN(blocks) * 2; + mysql_mutex_lock(&buf_pool.mutex); + const auto bs= + UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU); + if (UNIV_LIKELY(bs > BUF_LRU_MIN_LEN || rs < bs)) + { + buf_block_t *block= buf_LRU_get_free_block(true); + mysql_mutex_unlock(&buf_pool.mutex); + return block; + } + /* out of memory: redo log occupies more than 1/3 of buf_pool + and there are fewer than BUF_LRU_MIN_LEN pages left */ + mysql_mutex_unlock(&buf_pool.mutex); + if (freed) + return nullptr; + freed= true; + garbage_collect(); + } +} + +/** Wait for buffer pool to become available. */ +ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages) +{ + mysql_mutex_unlock(&mutex); + os_aio_wait_until_no_pending_reads(false); + mysql_mutex_lock(&mutex); + garbage_collect(); + mysql_mutex_lock(&buf_pool.mutex); + bool need_more= UT_LIST_GET_LEN(buf_pool.free) < pages; + mysql_mutex_unlock(&buf_pool.mutex); + if (need_more) + buf_flush_sync_batch(lsn); +} /** Register a redo log snippet for a page. @param it page iterator @param start_lsn start LSN of the mini-transaction @param lsn @see mtr_t::commit_lsn() @param l redo log snippet -@param len length of l, in bytes */ -inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, - const byte *l, size_t len) +@param len length of l, in bytes +@return whether we ran out of memory */ +ATTRIBUTE_NOINLINE +bool recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, + const byte *l, size_t len) { mysql_mutex_assert_owner(&mutex); - page_id_t page_id = it->first; page_recv_t &recs= it->second; + buf_block_t *block; switch (*l & 0x70) { case FREE_PAGE: case INIT_PAGE: recs.will_not_read(); - mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */ + mlog_init.add(it->first, start_lsn); /* FIXME: remove this! */ /* fall through */ default: log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last()); @@ -2012,7 +2081,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, if (tail->start_lsn != start_lsn) break; ut_ad(tail->lsn == lsn); - buf_block_t *block= UT_LIST_GET_LAST(blocks); + block= UT_LIST_GET_LAST(blocks); ut_ad(block); const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1; ut_ad(used >= ALIGNMENT); @@ -2025,7 +2094,7 @@ append: MEM_MAKE_ADDRESSABLE(end + 1, len); /* Append to the preceding record for the page */ tail->append(l, len); - return; + return false; } if (end <= &block->page.frame[used - ALIGNMENT] || &block->page.frame[used] >= end) @@ -2039,8 +2108,49 @@ append: ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT); goto append; } - recs.log.append(new (alloc(log_phys_t::alloc_size(len))) + + const size_t size{log_phys_t::alloc_size(len)}; + ut_ad(size <= srv_page_size); + void *buf; + block= UT_LIST_GET_FIRST(blocks); + if (UNIV_UNLIKELY(!block)) + { + create_block: + block= add_block(); + if (UNIV_UNLIKELY(!block)) + return true; + block->page.access_time= 1U << 16 | + ut_calc_align<uint16_t>(static_cast<uint16_t>(size), ALIGNMENT); + static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2"); + UT_LIST_ADD_FIRST(blocks, block); + MEM_MAKE_ADDRESSABLE(block->page.frame, size); + MEM_NOACCESS(block->page.frame + size, srv_page_size - size); + buf= block->page.frame; + } + else + { + size_t free_offset= static_cast<uint16_t>(block->page.access_time); + ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT)); + if (UNIV_UNLIKELY(!free_offset)) + { + ut_ad(srv_page_size == 65536); + goto create_block; + } + ut_ad(free_offset <= srv_page_size); + free_offset+= size; + + if (free_offset > srv_page_size) + goto create_block; + + block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 | + ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT); + MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - size, size); + buf= block->page.frame + free_offset - size; + } + + recs.log.append(new (my_assume_aligned<ALIGNMENT>(buf)) log_phys_t{start_lsn, lsn, l, len}); + return false; } /** Store/remove the freed pages in fil_name_t of recv_spaces. @@ -2304,13 +2414,84 @@ struct recv_ring : public recv_buf }; #endif -/** Parse and register one log_t::FORMAT_10_8 mini-transaction. -@param store whether to store the records -@param l log data source */ template<typename source> -inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) +void recv_sys_t::rewind(source &l, source &begin) noexcept +{ + ut_ad(srv_operation != SRV_OPERATION_BACKUP); + mysql_mutex_assert_owner(&mutex); + + const source end= l; + uint32_t rlen; + for (l= begin; !(l == end); l+= rlen) + { + const source recs{l}; + ++l; + const byte b= *recs; + + ut_ad(b > 1); + ut_ad(UNIV_LIKELY((b & 0x70) != RESERVED) || srv_force_recovery); + + rlen= b & 0xf; + if (!rlen) + { + const uint32_t lenlen= mlog_decode_varint_length(*l); + const uint32_t addlen= mlog_decode_varint(l); + ut_ad(addlen != MLOG_DECODE_ERROR); + rlen= addlen + 15 - lenlen; + l+= lenlen; + } + ut_ad(!l.is_eof(rlen)); + if (b & 0x80) + continue; + + uint32_t idlen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen)) + continue; + const uint32_t space_id= mlog_decode_varint(l); + if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR)) + continue; + l+= idlen; + rlen-= idlen; + idlen= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen)) + continue; + const uint32_t page_no= mlog_decode_varint(l); + if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR)) + continue; + const page_id_t id{space_id, page_no}; + if (pages_it == pages.end() || pages_it->first != id) + { + pages_it= pages.find(id); + if (pages_it == pages.end()) + continue; + } + + ut_ad(!pages_it->second.being_processed); + const log_phys_t *head= + static_cast<log_phys_t*>(*pages_it->second.log.begin()); + if (!head || head->start_lsn == lsn) + { + erase(pages_it); + pages_it= pages.end(); + } + else + pages_it->second.log.rewind(lsn); + } + + l= begin; + pages_it= pages.end(); +} + +/** Parse and register one log_t::FORMAT_10_8 mini-transaction. +@tparam store whether to store the records +@param l log data source +@param if_exists if store: whether to check if the tablespace exists */ +template<typename source,bool store> +inline +recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists) noexcept { + restart: #ifndef SUX_LOCK_GENERIC ut_ad(log_sys.latch.is_write_locked() || srv_operation == SRV_OPERATION_BACKUP || @@ -2319,12 +2500,15 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) mysql_mutex_assert_owner(&mutex); ut_ad(log_sys.next_checkpoint_lsn); ut_ad(log_sys.is_latest()); + ut_ad(store || !if_exists); + ut_ad(store || + srv_operation != SRV_OPERATION_BACKUP || + srv_operation != SRV_OPERATION_BACKUP_NO_DEFER); alignas(8) byte iv[MY_AES_BLOCK_SIZE]; byte *decrypt_buf= static_cast<byte*>(alloca(srv_page_size)); const lsn_t start_lsn{lsn}; - map::iterator cached_pages_it{pages.end()}; /* Check that the entire mini-transaction is included within the buffer */ if (l.is_eof(0)) @@ -2333,7 +2517,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) if (*l <= 1) return GOT_EOF; /* We should never write an empty mini-transaction. */ - const source begin{l}; + source begin{l}; uint32_t rlen; for (uint32_t total_len= 0; !l.is_eof(); l+= rlen, total_len+= rlen) { @@ -2433,7 +2617,6 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) sql_print_error("InnoDB: Unknown log record at LSN " LSN_PF, lsn); corrupted: found_corrupt_log= true; - pthread_cond_broadcast(&cond); return GOT_EOF; } @@ -2510,13 +2693,13 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) mach_write_to_4(iv + 12, page_no); got_page_op= !(b & 0x80); if (!got_page_op); - else if (srv_operation == SRV_OPERATION_BACKUP) + else if (!store && srv_operation == SRV_OPERATION_BACKUP) { if (page_no == 0 && first_page_init && (b & 0x10)) first_page_init(space_id); continue; } - else if (file_checkpoint && !is_predefined_tablespace(space_id)) + else if (store && file_checkpoint && !is_predefined_tablespace(space_id)) { recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id); if (i != recv_spaces.end() && i->first == space_id); @@ -2585,7 +2768,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) trim({space_id, 0}, lsn); truncated_undo_spaces[space_id - srv_undo_space_id_start]= { lsn, page_no }; - if (undo_space_trunc) + if (!store && undo_space_trunc) undo_space_trunc(space_id); #endif last_offset= 1; /* the next record must not be same_page */ @@ -2626,7 +2809,7 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) { if (UNIV_UNLIKELY(rlen + last_offset > srv_page_size)) goto record_corrupted; - if (UNIV_UNLIKELY(!page_no) && file_checkpoint) + if (store && UNIV_UNLIKELY(!page_no) && file_checkpoint) { const bool has_size= last_offset <= FSP_HEADER_OFFSET + FSP_SIZE && last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SIZE + 4; @@ -2705,38 +2888,57 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE); } #endif - const bool is_init= (b & 0x70) <= INIT_PAGE; - switch (store) { - case STORE_IF_EXISTS: - if (fil_space_t *space= fil_space_t::get(space_id)) + if (store) + { + if (if_exists) { - const auto size= space->get_size(); - space->release(); - if (!size) + if (fil_space_t *space= fil_space_t::get(space_id)) + { + const auto size= space->get_size(); + space->release(); + if (!size) + continue; + } + else if (!deferred_spaces.find(space_id)) continue; } - else if (!deferred_spaces.find(space_id)) - continue; - /* fall through */ - case STORE_YES: if (!mlog_init.will_avoid_read(id, start_lsn)) { - if (cached_pages_it == pages.end() || - cached_pages_it->first != id) - cached_pages_it= pages.emplace(id, page_recv_t{}).first; - add(cached_pages_it, start_lsn, lsn, - l.get_buf(cl, recs, decrypt_buf), l - recs + rlen); + if (pages_it == pages.end() || pages_it->first != id) + pages_it= pages.emplace(id, page_recv_t{}).first; + if (UNIV_UNLIKELY(add(pages_it, start_lsn, lsn, + l.get_buf(cl, recs, decrypt_buf), + l - recs + rlen))) + { + lsn= start_lsn; + log_sys.set_recovered_lsn(start_lsn); + l+= rlen; + offset= begin.ptr - log_sys.buf; + rewind(l, begin); + if (if_exists) + { + apply(false); + if (is_corrupt_fs()) + return GOT_EOF; + goto restart; + } + sql_print_information("InnoDB: Multi-batch recovery needed at LSN " + LSN_PF, lsn); + return GOT_OOM; + } } - continue; - case STORE_NO: - if (!is_init) - continue; + } + else if ((b & 0x70) <= INIT_PAGE) + { mlog_init.add(id, start_lsn); - map::iterator i= pages.find(id); - if (i == pages.end()) - continue; - i->second.log.clear(); - pages.erase(i); + if (pages_it == pages.end() || pages_it->first != id) + { + pages_it= pages.find(id); + if (pages_it == pages.end()) + continue; + } + map::iterator r= pages_it++; + erase(r); } } else if (rlen) @@ -2749,6 +2951,11 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) if (rlen < UNIV_PAGE_SIZE_MAX && !l.is_zero(rlen)) continue; } + else if (store) + { + ut_ad(file_checkpoint); + continue; + } else if (const lsn_t c= l.read8()) { if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) @@ -2830,21 +3037,27 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) if (UNIV_UNLIKELY(!recv_needed_recovery && srv_read_only_mode)) continue; + if (!store && + (srv_operation == SRV_OPERATION_BACKUP || + srv_operation == SRV_OPERATION_BACKUP_NO_DEFER)) + { + if ((b & 0xf0) < FILE_CHECKPOINT && log_file_op) + log_file_op(space_id, b & 0xf0, + reinterpret_cast<const byte*>(fn), + static_cast<ulint>(fnend - fn), + reinterpret_cast<const byte*>(fn2), + fn2 ? static_cast<ulint>(fn2end - fn2) : 0); + continue; + } + fil_name_process(fn, fnend - fn, space_id, (b & 0xf0) == FILE_DELETE ? FILE_DELETE : FILE_MODIFY, - start_lsn, store); - - if ((b & 0xf0) < FILE_CHECKPOINT && log_file_op) - log_file_op(space_id, b & 0xf0, - reinterpret_cast<const byte*>(fn), - static_cast<ulint>(fnend - fn), - reinterpret_cast<const byte*>(fn2), - fn2 ? static_cast<ulint>(fn2end - fn2) : 0); + start_lsn, if_exists); if (fn2) { fil_name_process(fn2, fn2end - fn2, space_id, - FILE_RENAME, start_lsn, store); + FILE_RENAME, start_lsn, if_exists); if (file_checkpoint) { const size_t len= fn2end - fn2; @@ -2868,18 +3081,23 @@ inline recv_sys_t::parse_mtr_result recv_sys_t::parse(store_t store, source &l) return OK; } -ATTRIBUTE_NOINLINE -recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr(store_t store) noexcept +template<bool store> +recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr(bool if_exists) noexcept { recv_buf s{&log_sys.buf[recv_sys.offset]}; - return recv_sys.parse(store, s); + return recv_sys.parse<recv_buf,store>(s, if_exists); } +/** for mariadb-backup; @see xtrabackup_copy_logfile() */ +template +recv_sys_t::parse_mtr_result recv_sys_t::parse_mtr<false>(bool) noexcept; + #ifdef HAVE_PMEM -recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept +template<bool store> +recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(bool if_exists) noexcept { - recv_sys_t::parse_mtr_result r{parse_mtr(store)}; - if (r != PREMATURE_EOF || !log_sys.is_pmem()) + recv_sys_t::parse_mtr_result r{parse_mtr<store>(if_exists)}; + if (UNIV_LIKELY(r != PREMATURE_EOF) || !log_sys.is_pmem()) return r; ut_ad(recv_sys.len == log_sys.file_size); ut_ad(recv_sys.offset >= log_sys.START_OFFSET); @@ -2888,7 +3106,7 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept {recv_sys.offset == recv_sys.len ? &log_sys.buf[log_sys.START_OFFSET] : &log_sys.buf[recv_sys.offset]}; - return recv_sys.parse(store, s); + return recv_sys.parse<recv_ring,store>(s, if_exists); } #endif @@ -2896,23 +3114,22 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse_pmem(store_t store) noexcept lsn of a log record. @param[in,out] block buffer pool page @param[in,out] mtr mini-transaction -@param[in,out] p recovery address +@param[in,out] recs log records to apply @param[in,out] space tablespace, or NULL if not looked up yet @param[in,out] init page initialization operation, or NULL @return the recovered page @retval nullptr on failure */ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, - const recv_sys_t::map::iterator &p, - fil_space_t *space= nullptr, - mlog_init_t::init *init= nullptr) + page_recv_t &recs, + fil_space_t *space, + recv_init *init) { - mysql_mutex_assert_owner(&recv_sys.mutex); + mysql_mutex_assert_not_owner(&recv_sys.mutex); ut_ad(recv_sys.apply_log_recs); ut_ad(recv_needed_recovery); ut_ad(!init || init->created); ut_ad(!init || init->lsn); - ut_ad(block->page.id() == p->first); - ut_ad(!p->second.is_being_processed()); + ut_ad(recs.being_processed == 1); ut_ad(!space || space->id == block->page.id().space()); ut_ad(log_sys.is_latest()); @@ -2924,10 +3141,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, block->page.id().space(), block->page.id().page_no())); - p->second.state = page_recv_t::RECV_BEING_PROCESSED; - - mysql_mutex_unlock(&recv_sys.mutex); - byte *frame = UNIV_LIKELY_NULL(block->page.zip.data) ? block->page.zip.data : block->page.frame; @@ -2941,7 +3154,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, bool skipped_after_init = false; - for (const log_rec_t* recv : p->second.log) { + for (const log_rec_t* recv : recs.log) { const log_phys_t* l = static_cast<const log_phys_t*>(recv); ut_ad(l->lsn); ut_ad(end_lsn <= l->lsn); @@ -2999,8 +3212,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, block->page.id().space(), block->page.id().page_no())); - log_phys_t::apply_status a= l->apply(*block, - p->second.last_offset); + log_phys_t::apply_status a= l->apply(*block, recs.last_offset); switch (a) { case log_phys_t::APPLIED_NO: @@ -3123,26 +3335,11 @@ set_start_lsn: mtr.commit(); done: - time_t now = time(NULL); - - mysql_mutex_lock(&recv_sys.mutex); - + /* FIXME: do this in page read, protected with recv_sys.mutex! */ if (recv_max_page_lsn < page_lsn) { recv_max_page_lsn = page_lsn; } - ut_ad(!block || p->second.is_being_processed()); - ut_ad(!block || !recv_sys.pages.empty()); - - if (recv_sys.report(now)) { - const size_t n = recv_sys.pages.size(); - sql_print_information("InnoDB: To recover: %zu pages from log", - n); - service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, - "To recover: %zu pages" - " from log", n); - } - return block; } @@ -3156,146 +3353,347 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id) mysql_mutex_lock(&mutex); map::iterator p= pages.find(page_id); - if (p != pages.end()) + if (p == pages.end()) { - p->second.log.clear(); - pages.erase(p); - if (!srv_force_recovery) - { - set_corrupt_fs(); - ib::error() << "Unable to apply log to corrupted page " << page_id - << "; set innodb_force_recovery to ignore"; - } - else - ib::warn() << "Discarding log for corrupted page " << page_id; + mysql_mutex_unlock(&mutex); + return; } - if (pages.empty()) - pthread_cond_broadcast(&cond); + p->second.being_processed= -1; + if (!srv_force_recovery) + set_corrupt_fs(); mysql_mutex_unlock(&mutex); -} -/** Possibly finish a recovery batch. */ -inline void recv_sys_t::maybe_finish_batch() -{ - mysql_mutex_assert_owner(&mutex); - ut_ad(recovery_on); - if (!apply_batch_on || pages.empty() || is_corrupt_log() || is_corrupt_fs()) - pthread_cond_broadcast(&cond); + ib::error_or_warn(!srv_force_recovery) + << "Unable to apply log to corrupted page " << page_id; } ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log() { mysql_mutex_lock(&mutex); found_corrupt_log= true; - pthread_cond_broadcast(&cond); mysql_mutex_unlock(&mutex); } ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs() { mysql_mutex_assert_owner(&mutex); + if (!srv_force_recovery) + sql_print_information("InnoDB: Set innodb_force_recovery=1" + " to ignore corrupted pages."); found_corrupt_fs= true; - pthread_cond_broadcast(&cond); } -/** Apply any buffered redo log to a page that was just read from a data file. -@param[in,out] space tablespace -@param[in,out] bpage buffer pool page +/** Apply any buffered redo log to a page. +@param space tablespace +@param bpage buffer pool page @return whether the page was recovered correctly */ bool recv_recover_page(fil_space_t* space, buf_page_t* bpage) { - mtr_t mtr; - mtr.start(); - mtr.set_log_mode(MTR_LOG_NO_REDO); - - ut_ad(bpage->frame); - /* Move the ownership of the x-latch on the page to - this OS thread, so that we can acquire a second - x-latch on it. This is needed for the operations to - the page to pass the debug checks. */ - bpage->lock.claim_ownership(); - bpage->lock.x_lock_recursive(); - bpage->fix_on_recovery(); - mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), - MTR_MEMO_PAGE_X_FIX); - - buf_block_t* success = reinterpret_cast<buf_block_t*>(bpage); + mtr_t mtr; + mtr.start(); + mtr.set_log_mode(MTR_LOG_NO_REDO); - mysql_mutex_lock(&recv_sys.mutex); - if (recv_sys.apply_log_recs) { - recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id()); - if (p != recv_sys.pages.end() - && !p->second.is_being_processed()) { - success = recv_recover_page(success, mtr, p, space); - if (UNIV_LIKELY(!!success)) { - p->second.log.clear(); - recv_sys.pages.erase(p); - } - recv_sys.maybe_finish_batch(); - goto func_exit; - } - } + ut_ad(bpage->frame); + /* Move the ownership of the x-latch on the page to this OS thread, + so that we can acquire a second x-latch on it. This is needed for + the operations to the page to pass the debug checks. */ + bpage->lock.claim_ownership(); + bpage->lock.x_lock_recursive(); + bpage->fix_on_recovery(); + mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX); - mtr.commit(); + buf_block_t *success= reinterpret_cast<buf_block_t*>(bpage); + + mysql_mutex_lock(&recv_sys.mutex); + if (recv_sys.apply_log_recs) + { + const page_id_t id{bpage->id()}; + recv_sys_t::map::iterator p= recv_sys.pages.find(id); + if (p == recv_sys.pages.end()); + else if (p->second.being_processed < 0) + { + recv_sys.pages_it_invalidate(p); + recv_sys.erase(p); + } + else + { + p->second.being_processed= 1; + recv_sys_t::init *init= nullptr; + if (p->second.skip_read) + (init= &mlog_init.last(id))->created= true; + mysql_mutex_unlock(&recv_sys.mutex); + success= recv_recover_page(success, mtr, p->second, space, init); + p->second.being_processed= -1; + goto func_exit; + } + } + + mysql_mutex_unlock(&recv_sys.mutex); + mtr.commit(); func_exit: - mysql_mutex_unlock(&recv_sys.mutex); - ut_ad(mtr.has_committed()); - return success; + ut_ad(mtr.has_committed()); + return success; } -/** Read pages for which log needs to be applied. -@param page_id first page identifier to read -@param i iterator to recv_sys.pages */ -TRANSACTIONAL_TARGET -static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i) +void IORequest::fake_read_complete(os_offset_t offset) const { - uint32_t page_nos[32]; - ut_ad(page_id == i->first); - page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U)); - const page_id_t up_limit{page_id + 31}; - uint32_t* p= page_nos; + ut_ad(node); + ut_ad(is_read()); + ut_ad(bpage); + ut_ad(bpage->frame); + ut_ad(recv_recovery_is_on()); + ut_ad(offset); + + mtr_t mtr; + mtr.start(); + mtr.set_log_mode(MTR_LOG_NO_REDO); - for (; i != recv_sys.pages.end() && i->first <= up_limit; i++) + ut_ad(bpage->frame); + /* Move the ownership of the x-latch on the page to this OS thread, + so that we can acquire a second x-latch on it. This is needed for + the operations to the page to pass the debug checks. */ + bpage->lock.claim_ownership(); + bpage->lock.x_lock_recursive(); + bpage->fix_on_recovery(); + mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX); + + page_recv_t &recs= *reinterpret_cast<page_recv_t*>(slot); + ut_ad(recs.being_processed == 1); + recv_init &init= *reinterpret_cast<recv_init*>(offset); + ut_ad(init.lsn > 1); + init.created= true; + + if (recv_recover_page(reinterpret_cast<buf_block_t*>(bpage), + mtr, recs, node->space, &init)) { - if (i->second.state == page_recv_t::RECV_NOT_PROCESSED) + ut_ad(bpage->oldest_modification() || bpage->is_freed()); + bpage->lock.x_unlock(true); + } + recs.being_processed= -1; + ut_ad(mtr.has_committed()); + + node->space->release(); +} + +/** @return whether a page has been freed */ +inline bool fil_space_t::is_freed(uint32_t page) +{ + std::lock_guard<std::mutex> freed_lock(freed_range_mutex); + return freed_ranges.contains(page); +} + +bool recv_sys_t::report(time_t time) +{ + if (time - progress_time < 15) + return false; + progress_time= time; + return true; +} + +ATTRIBUTE_COLD +void recv_sys_t::report_progress() const +{ + mysql_mutex_assert_owner(&mutex); + const size_t n{pages.size()}; + if (recv_sys.scanned_lsn == recv_sys.lsn) + { + sql_print_information("InnoDB: To recover: %zu pages", n); + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "To recover: %zu pages", n); + } + else + { + sql_print_information("InnoDB: To recover: LSN " LSN_PF + "/" LSN_PF "; %zu pages", + recv_sys.lsn, recv_sys.scanned_lsn, n); + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "To recover: LSN " LSN_PF + "/" LSN_PF "; %zu pages", + recv_sys.lsn, recv_sys.scanned_lsn, n); + } +} + +/** Apply a recovery batch. +@param space_id current tablespace identifier +@param space current tablespace +@param free_block spare buffer block +@param last_batch whether it is possible to write more redo log +@return whether the caller must provide a new free_block */ +bool recv_sys_t::apply_batch(uint32_t space_id, fil_space_t *&space, + buf_block_t *&free_block, bool last_batch) +{ + mysql_mutex_assert_owner(&mutex); + ut_ad(pages_it != pages.end()); + ut_ad(!pages_it->second.log.empty()); + + mysql_mutex_lock(&buf_pool.mutex); + size_t n= 0, max_n= std::min<size_t>(BUF_LRU_MIN_LEN, + UT_LIST_GET_LEN(buf_pool.LRU) + + UT_LIST_GET_LEN(buf_pool.free)); + mysql_mutex_unlock(&buf_pool.mutex); + + map::iterator begin= pages.end(); + page_id_t begin_id{~0ULL}; + + while (pages_it != pages.end() && n < max_n) + { + ut_ad(!buf_dblwr.is_inside(pages_it->first)); + if (!pages_it->second.being_processed) { - i->second.state= page_recv_t::RECV_BEING_READ; - *p++= i->first.page_no(); + if (space_id != pages_it->first.space()) + { + space_id= pages_it->first.space(); + if (space) + space->release(); + space= fil_space_t::get(space_id); + if (!space) + { + auto d= deferred_spaces.defers.find(space_id); + if (d == deferred_spaces.defers.end() || d->second.deleted) + /* For deleted files we preserve the deferred_spaces entry */; + else if (!free_block) + return true; + else + { + space= recover_deferred(pages_it, d->second.file_name, free_block); + deferred_spaces.defers.erase(d); + if (!space && !srv_force_recovery) + { + set_corrupt_fs(); + return false; + } + } + } + } + if (!space || space->is_freed(pages_it->first.page_no())) + pages_it->second.being_processed= -1; + else if (!n++) + { + begin= pages_it; + begin_id= pages_it->first; + } } + pages_it++; } - if (p != page_nos) + if (!last_batch) + log_sys.latch.wr_unlock(); + + pages_it= begin; + + if (report(time(nullptr))) + report_progress(); + + if (!n) + goto wait; + + mysql_mutex_lock(&buf_pool.mutex); + + if (UNIV_UNLIKELY(UT_LIST_GET_LEN(buf_pool.free) < n)) { - mysql_mutex_unlock(&recv_sys.mutex); - buf_read_recv_pages(page_id.space(), {page_nos, p}); - mysql_mutex_lock(&recv_sys.mutex); + mysql_mutex_unlock(&buf_pool.mutex); + wait: + wait_for_pool(n); + if (n); + else if (!last_batch) + goto unlock_relock; + else + goto get_last; + pages_it= pages.lower_bound(begin_id); + ut_ad(pages_it != pages.end()); } + else + mysql_mutex_unlock(&buf_pool.mutex); + + while (pages_it != pages.end()) + { + ut_ad(!buf_dblwr.is_inside(pages_it->first)); + if (!pages_it->second.being_processed) + { + const page_id_t id{pages_it->first}; + + if (space_id != id.space()) + { + space_id= id.space(); + if (space) + space->release(); + space= fil_space_t::get(space_id); + } + if (!space) + { + const auto it= deferred_spaces.defers.find(space_id); + if (it != deferred_spaces.defers.end() && !it->second.deleted) + /* The records must be processed after recover_deferred(). */ + goto next; + goto space_not_found; + } + else if (space->is_freed(id.page_no())) + { + space_not_found: + pages_it->second.being_processed= -1; + goto next; + } + else + { + page_recv_t &recs= pages_it->second; + ut_ad(!recs.log.empty()); + recs.being_processed= 1; + init *init= recs.skip_read ? &mlog_init.last(id) : nullptr; + mysql_mutex_unlock(&mutex); + buf_read_recover(space, id, recs, init); + } + + if (!--n) + { + if (last_batch) + goto relock_last; + goto relock; + } + mysql_mutex_lock(&mutex); + pages_it= pages.lower_bound(id); + } + else + next: + pages_it++; + } + + if (!last_batch) + { + unlock_relock: + mysql_mutex_unlock(&mutex); + relock: + log_sys.latch.wr_lock(SRW_LOCK_CALL); + relock_last: + mysql_mutex_lock(&mutex); + get_last: + pages_it= pages.lower_bound(begin_id); + } + + return false; } /** Attempt to initialize a page based on redo log records. -@param page_id page identifier -@param p iterator pointing to page_id +@param p iterator @param mtr mini-transaction @param b pre-allocated buffer pool block +@param init page initialization @return the recovered block @retval nullptr if the page cannot be initialized based on log records @retval -1 if the page cannot be recovered due to corruption */ -inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, - map::iterator &p, mtr_t &mtr, - buf_block_t *b) +inline buf_block_t *recv_sys_t::recover_low(const map::iterator &p, mtr_t &mtr, + buf_block_t *b, init &init) { - mysql_mutex_assert_owner(&mutex); - ut_ad(p->first == page_id); + mysql_mutex_assert_not_owner(&mutex); page_recv_t &recs= p->second; - ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ); + ut_ad(recs.skip_read); + ut_ad(recs.being_processed == 1); buf_block_t* block= nullptr; - mlog_init_t::init &i= mlog_init.last(page_id); const lsn_t end_lsn= recs.log.last()->lsn; - if (end_lsn < i.lsn) - DBUG_LOG("ib_log", "skip log for page " << page_id - << " LSN " << end_lsn << " < " << i.lsn); - fil_space_t *space= fil_space_t::get(page_id.space()); + if (end_lsn < init.lsn) + DBUG_LOG("ib_log", "skip log for page " << p->first + << " LSN " << end_lsn << " < " << init.lsn); + fil_space_t *space= fil_space_t::get(p->first.space()); mtr.start(); mtr.set_log_mode(MTR_LOG_NO_REDO); @@ -3304,82 +3702,77 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, if (!space) { - if (page_id.page_no() != 0) + if (p->first.page_no() != 0) { nothing_recoverable: mtr.commit(); return nullptr; } - auto it= recv_spaces.find(page_id.space()); + auto it= recv_spaces.find(p->first.space()); ut_ad(it != recv_spaces.end()); uint32_t flags= it->second.flags; zip_size= fil_space_t::zip_size(flags); - block= buf_page_create_deferred(page_id.space(), zip_size, &mtr, b); + block= buf_page_create_deferred(p->first.space(), zip_size, &mtr, b); ut_ad(block == b); block->page.lock.x_lock_recursive(); } else { - block= buf_page_create(space, page_id.page_no(), zip_size, &mtr, b); + block= buf_page_create(space, p->first.page_no(), zip_size, &mtr, b); if (UNIV_UNLIKELY(block != b)) { /* The page happened to exist in the buffer pool, or it was just being read in. Before the exclusive page latch was acquired by buf_page_create(), all changes to the page must have been applied. */ - ut_ad(pages.find(page_id) == pages.end()); + ut_d(mysql_mutex_lock(&mutex)); + ut_ad(pages.find(p->first) == pages.end()); + ut_d(mysql_mutex_unlock(&mutex)); space->release(); goto nothing_recoverable; } } - ut_ad(&recs == &pages.find(page_id)->second); - i.created= true; - map::iterator r= p++; - block= recv_recover_page(block, mtr, r, space, &i); + ut_d(mysql_mutex_lock(&mutex)); + ut_ad(&recs == &pages.find(p->first)->second); + ut_d(mysql_mutex_unlock(&mutex)); + init.created= true; + block= recv_recover_page(block, mtr, recs, space, &init); ut_ad(mtr.has_committed()); - if (block) - { - recs.log.clear(); - pages.erase(r); - } - else - block= reinterpret_cast<buf_block_t*>(-1); - - if (pages.empty()) - pthread_cond_signal(&cond); - if (space) space->release(); - return block; + return block ? block : reinterpret_cast<buf_block_t*>(-1); } /** Attempt to initialize a page based on redo log records. @param page_id page identifier @return recovered block @retval nullptr if the page cannot be initialized based on log records */ -buf_block_t *recv_sys_t::recover_low(const page_id_t page_id) +ATTRIBUTE_COLD buf_block_t *recv_sys_t::recover_low(const page_id_t page_id) { - buf_block_t *free_block= buf_LRU_get_free_block(false); - buf_block_t *block= nullptr; - mysql_mutex_lock(&mutex); map::iterator p= pages.find(page_id); - if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) + if (p != pages.end() && !p->second.being_processed && p->second.skip_read) { + p->second.being_processed= 1; + init &init= mlog_init.last(page_id); + mysql_mutex_unlock(&mutex); + buf_block_t *free_block= buf_LRU_get_free_block(false); mtr_t mtr; - block= recover_low(page_id, p, mtr, free_block); + buf_block_t *block= recover_low(p, mtr, free_block, init); + p->second.being_processed= -1; ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) || block == free_block); + if (UNIV_UNLIKELY(!block)) + buf_pool.free_block(free_block); + return block; } mysql_mutex_unlock(&mutex); - if (UNIV_UNLIKELY(!block)) - buf_pool.free_block(free_block); - return block; + return nullptr; } inline fil_space_t *fil_system_t::find(const char *path) const @@ -3427,45 +3820,18 @@ void recv_sys_t::apply(bool last_batch) mysql_mutex_assert_owner(&mutex); - timespec abstime; - - while (apply_batch_on) - { - if (is_corrupt_log()) - return; - if (last_batch) - my_cond_wait(&cond, &mutex.m_mutex); - else - { -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_write_locked()); -#endif - log_sys.latch.wr_unlock(); - set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */ - my_cond_timedwait(&cond, &mutex.m_mutex, &abstime); - mysql_mutex_unlock(&mutex); - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); - } - } - - recv_no_ibuf_operations = !last_batch || - srv_operation == SRV_OPERATION_RESTORE || - srv_operation == SRV_OPERATION_RESTORE_EXPORT; - - mtr_t mtr; + garbage_collect(); if (!pages.empty()) { - const char *msg= last_batch - ? "Starting final batch to recover" - : "Starting a batch to recover"; - const size_t n= pages.size(); - sql_print_information("InnoDB: %s %zu pages from redo log.", msg, n); - sd_notifyf(0, "STATUS=%s %zu pages from redo log", msg, n); + recv_no_ibuf_operations = !last_batch || + srv_operation == SRV_OPERATION_RESTORE || + srv_operation == SRV_OPERATION_RESTORE_EXPORT; + ut_ad(!last_batch || lsn == scanned_lsn); + progress_time= time(nullptr); + report_progress(); apply_log_recs= true; - apply_batch_on= true; for (auto id= srv_undo_tablespaces_open; id--;) { @@ -3491,132 +3857,71 @@ void recv_sys_t::apply(bool last_batch) fil_system.extend_to_recv_size(); - /* We must release log_sys.latch and recv_sys.mutex before - invoking buf_LRU_get_free_block(). Allocating a block may initiate - a redo log write and therefore acquire log_sys.latch. To avoid - deadlocks, log_sys.latch must not be acquired while holding - recv_sys.mutex. */ - mysql_mutex_unlock(&mutex); - if (!last_batch) - log_sys.latch.wr_unlock(); - - buf_block_t *free_block= buf_LRU_get_free_block(false); + fil_space_t *space= nullptr; + uint32_t space_id= ~0; + buf_block_t *free_block= nullptr; - if (!last_batch) - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); - - for (map::iterator p= pages.begin(); p != pages.end(); ) + for (pages_it= pages.begin(); pages_it != pages.end(); + pages_it= pages.begin()) { - const page_id_t page_id= p->first; - ut_ad(!p->second.log.empty()); + if (!free_block) + { + if (!last_batch) + log_sys.latch.wr_unlock(); + wait_for_pool(1); + pages_it= pages.begin(); + mysql_mutex_unlock(&mutex); + /* We must release log_sys.latch and recv_sys.mutex before + invoking buf_LRU_get_free_block(). Allocating a block may initiate + a redo log write and therefore acquire log_sys.latch. To avoid + deadlocks, log_sys.latch must not be acquired while holding + recv_sys.mutex. */ + free_block= buf_LRU_get_free_block(false); + if (!last_batch) + log_sys.latch.wr_lock(SRW_LOCK_CALL); + mysql_mutex_lock(&mutex); + pages_it= pages.begin(); + } - const uint32_t space_id= page_id.space(); - auto d= deferred_spaces.defers.find(space_id); - if (d != deferred_spaces.defers.end()) + while (pages_it != pages.end()) { - if (d->second.deleted) + if (is_corrupt_fs() || is_corrupt_log()) { - /* For deleted files we must preserve the entry in deferred_spaces */ -erase_for_space: - while (p != pages.end() && p->first.space() == space_id) + if (space) + space->release(); + if (free_block) { - map::iterator r= p++; - r->second.log.clear(); - pages.erase(r); + mysql_mutex_unlock(&mutex); + mysql_mutex_lock(&buf_pool.mutex); + buf_LRU_block_free_non_file_page(free_block); + mysql_mutex_unlock(&buf_pool.mutex); + mysql_mutex_lock(&mutex); } + return; } - else if (recover_deferred(p, d->second.file_name, free_block)) - { - if (!srv_force_recovery) - set_corrupt_fs(); - deferred_spaces.defers.erase(d); - goto erase_for_space; - } - else - deferred_spaces.defers.erase(d); - if (!free_block) - goto next_free_block; - p= pages.lower_bound(page_id); - continue; - } - - switch (p->second.state) { - case page_recv_t::RECV_BEING_READ: - case page_recv_t::RECV_BEING_PROCESSED: - p++; - continue; - case page_recv_t::RECV_WILL_NOT_READ: - if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block))) - { -next_free_block: - mysql_mutex_unlock(&mutex); - if (!last_batch) - log_sys.latch.wr_unlock(); - free_block= buf_LRU_get_free_block(false); - if (!last_batch) - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); + if (apply_batch(space_id, space, free_block, last_batch)) break; - } - ut_ad(p == pages.end() || p->first > page_id); - continue; - case page_recv_t::RECV_NOT_PROCESSED: - recv_read_in_area(page_id, p); } - p= pages.lower_bound(page_id); - /* Ensure that progress will be made. */ - ut_ad(p == pages.end() || p->first > page_id || - p->second.state >= page_recv_t::RECV_BEING_READ); } - buf_pool.free_block(free_block); + if (space) + space->release(); - /* Wait until all the pages have been processed */ - for (;;) + if (free_block) { - const bool empty= pages.empty(); - if (empty && !os_aio_pending_reads()) - break; - - if (!is_corrupt_fs() && !is_corrupt_log()) - { - if (last_batch) - { - if (!empty) - my_cond_wait(&cond, &mutex.m_mutex); - else - { - mysql_mutex_unlock(&mutex); - os_aio_wait_until_no_pending_reads(false); - mysql_mutex_lock(&mutex); - ut_ad(pages.empty()); - } - } - else - { -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_write_locked()); -#endif - log_sys.latch.wr_unlock(); - set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */ - my_cond_timedwait(&cond, &mutex.m_mutex, &abstime); - mysql_mutex_unlock(&mutex); - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); - } - continue; - } - if (is_corrupt_fs() && !srv_force_recovery) - sql_print_information("InnoDB: Set innodb_force_recovery=1" - " to ignore corrupted pages."); - return; + mysql_mutex_lock(&buf_pool.mutex); + buf_LRU_block_free_non_file_page(free_block); + mysql_mutex_unlock(&buf_pool.mutex); } } if (last_batch) - /* We skipped this in buf_page_create(). */ - mlog_init.mark_ibuf_exist(mtr); + { + if (!recv_no_ibuf_operations) + /* We skipped this in buf_page_create(). */ + mlog_init.mark_ibuf_exist(); + mlog_init.clear(); + } else { mlog_init.reset(); @@ -3625,21 +3930,22 @@ next_free_block: mysql_mutex_unlock(&mutex); - if (last_batch && srv_operation != SRV_OPERATION_RESTORE && - srv_operation != SRV_OPERATION_RESTORE_EXPORT) - /* Instead of flushing, last_batch sorts the buf_pool.flush_list - in ascending order of buf_page_t::oldest_modification. */ - log_sort_flush_list(); - else - buf_flush_sync_batch(lsn); - if (!last_batch) { + buf_flush_sync_batch(lsn); buf_pool_invalidate(); log_sys.latch.wr_lock(SRW_LOCK_CALL); } + else if (srv_operation == SRV_OPERATION_RESTORE || + srv_operation == SRV_OPERATION_RESTORE_EXPORT) + buf_flush_sync_batch(lsn); + else + /* Instead of flushing, last_batch sorts the buf_pool.flush_list + in ascending order of buf_page_t::oldest_modification. */ + log_sort_flush_list(); + #ifdef HAVE_PMEM - else if (log_sys.is_pmem()) + if (last_batch && log_sys.is_pmem()) mprotect(log_sys.buf, len, PROT_READ | PROT_WRITE); #endif @@ -3649,35 +3955,24 @@ next_free_block: clear(); } -/** Check whether the number of read redo log blocks exceeds the maximum. -@return whether the memory is exhausted */ -inline bool recv_sys_t::is_memory_exhausted() -{ - if (UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages()) - return false; - DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF - " last stored offset %zu\n", lsn, offset)); - return true; -} - /** Scan log_t::FORMAT_10_8 log store records to the parsing buffer. @param last_phase whether changes can be applied to the tablespaces @return whether rescan is needed (not everything was stored) */ static bool recv_scan_log(bool last_phase) { DBUG_ENTER("recv_scan_log"); - DBUG_ASSERT(!last_phase || recv_sys.file_checkpoint); ut_ad(log_sys.is_latest()); const size_t block_size_1{log_sys.get_block_size() - 1}; mysql_mutex_lock(&recv_sys.mutex); - recv_sys.clear(); ut_d(recv_sys.after_apply= last_phase); - ut_ad(!last_phase || recv_sys.file_checkpoint); + if (!last_phase) + recv_sys.clear(); + else + ut_ad(recv_sys.file_checkpoint); - store_t store= last_phase - ? STORE_IF_EXISTS : recv_sys.file_checkpoint ? STORE_YES : STORE_NO; + bool store{recv_sys.file_checkpoint != 0}; size_t buf_size= log_sys.buf_size; #ifdef HAVE_PMEM if (log_sys.is_pmem()) @@ -3694,6 +3989,7 @@ static bool recv_scan_log(bool last_phase) recv_sys.len= 0; } + lsn_t rewound_lsn= 0; for (ut_d(lsn_t source_offset= 0);;) { #ifndef SUX_LOCK_GENERIC @@ -3741,27 +4037,29 @@ static bool recv_scan_log(bool last_phase) if (UNIV_UNLIKELY(!recv_needed_recovery)) { - ut_ad(store == (recv_sys.file_checkpoint ? STORE_YES : STORE_NO)); + ut_ad(!last_phase); ut_ad(recv_sys.lsn >= log_sys.next_checkpoint_lsn); - for (;;) + if (!store) { - const byte& b{log_sys.buf[recv_sys.offset]}; - r= recv_sys.parse_pmem(store); - if (r == recv_sys_t::OK) + ut_ad(!recv_sys.file_checkpoint); + for (;;) { - if (store == STORE_NO && - (b == FILE_CHECKPOINT + 2 + 8 || (b & 0xf0) == FILE_MODIFY)) - continue; - } - else if (r == recv_sys_t::PREMATURE_EOF) - goto read_more; - else if (store != STORE_NO) - break; + const byte& b{log_sys.buf[recv_sys.offset]}; + r= recv_sys.parse_pmem<false>(false); + switch (r) { + case recv_sys_t::PREMATURE_EOF: + goto read_more; + default: + ut_ad(r == recv_sys_t::GOT_EOF); + break; + case recv_sys_t::OK: + if (b == FILE_CHECKPOINT + 2 + 8 || (b & 0xf0) == FILE_MODIFY) + continue; + } - if (store == STORE_NO) - { const lsn_t end{recv_sys.file_checkpoint}; + ut_ad(!end || end == recv_sys.lsn); mysql_mutex_unlock(&recv_sys.mutex); if (!end) @@ -3771,45 +4069,73 @@ static bool recv_scan_log(bool last_phase) ") at " LSN_PF, log_sys.next_checkpoint_lsn, recv_sys.lsn); } - else - ut_ad(end == recv_sys.lsn); DBUG_RETURN(true); } - - recv_needed_recovery= true; - if (srv_read_only_mode) - { - mysql_mutex_unlock(&recv_sys.mutex); - DBUG_RETURN(false); + } + else + { + ut_ad(recv_sys.file_checkpoint != 0); + switch ((r= recv_sys.parse_pmem<true>(false))) { + case recv_sys_t::PREMATURE_EOF: + goto read_more; + case recv_sys_t::GOT_EOF: + break; + default: + ut_ad(r == recv_sys_t::OK); + recv_needed_recovery= true; + if (srv_read_only_mode) + { + mysql_mutex_unlock(&recv_sys.mutex); + DBUG_RETURN(false); + } + sql_print_information("InnoDB: Starting crash recovery from" + " checkpoint LSN=" LSN_PF, + log_sys.next_checkpoint_lsn); } - sql_print_information("InnoDB: Starting crash recovery from" - " checkpoint LSN=" LSN_PF, - log_sys.next_checkpoint_lsn); - break; } } - while ((r= recv_sys.parse_pmem(store)) == recv_sys_t::OK) + if (!store) + skip_the_rest: + while ((r= recv_sys.parse_pmem<false>(false)) == recv_sys_t::OK); + else { - if (store != STORE_NO && recv_sys.is_memory_exhausted()) - { - ut_ad(last_phase == (store == STORE_IF_EXISTS)); - if (store == STORE_YES) + uint16_t count= 0; + while ((r= recv_sys.parse_pmem<true>(last_phase)) == recv_sys_t::OK) + if (!++count && recv_sys.report(time(nullptr))) { - store= STORE_NO; - recv_sys.last_stored_lsn= recv_sys.lsn; - } - else - { - ut_ad(store == STORE_IF_EXISTS); - recv_sys.apply(false); + const size_t n= recv_sys.pages.size(); + sql_print_information("InnoDB: Parsed redo log up to LSN=" LSN_PF + "; to recover: %zu pages", recv_sys.lsn, n); + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "Parsed redo log up to LSN=" LSN_PF + "; to recover: %zu pages", + recv_sys.lsn, n); } + if (r == recv_sys_t::GOT_OOM) + { + ut_ad(!last_phase); + rewound_lsn= recv_sys.lsn; + store= false; + if (recv_sys.scanned_lsn <= 1) + goto skip_the_rest; + ut_ad(recv_sys.file_checkpoint); + goto func_exit; } } if (r != recv_sys_t::PREMATURE_EOF) { ut_ad(r == recv_sys_t::GOT_EOF); + got_eof: + ut_ad(recv_sys.is_initialised()); + if (recv_sys.scanned_lsn > 1) + { + ut_ad(recv_sys.scanned_lsn == recv_sys.lsn); + break; + } + recv_sys.scanned_lsn= recv_sys.lsn; + sql_print_information("InnoDB: End of log at LSN=" LSN_PF, recv_sys.lsn); break; } @@ -3822,7 +4148,7 @@ static bool recv_scan_log(bool last_phase) break; if (recv_sys.offset < log_sys.get_block_size()) - break; + goto got_eof; if (recv_sys.offset > buf_size / 4 || (recv_sys.offset > block_size_1 && @@ -3835,21 +4161,21 @@ static bool recv_scan_log(bool last_phase) } } - const bool corrupt= recv_sys.is_corrupt_log() || recv_sys.is_corrupt_fs(); - recv_sys.maybe_finish_batch(); if (last_phase) + { + ut_ad(!rewound_lsn); + ut_ad(recv_sys.lsn >= recv_sys.file_checkpoint); log_sys.set_recovered_lsn(recv_sys.lsn); + } + else if (rewound_lsn) + { + ut_ad(!store); + ut_ad(recv_sys.file_checkpoint); + recv_sys.lsn= rewound_lsn; + } +func_exit: mysql_mutex_unlock(&recv_sys.mutex); - - if (corrupt) - DBUG_RETURN(false); - - DBUG_PRINT("ib_log", - ("%s " LSN_PF " completed", last_phase ? "rescan" : "scan", - recv_sys.lsn)); - ut_ad(!last_phase || recv_sys.lsn >= recv_sys.file_checkpoint); - - DBUG_RETURN(store == STORE_NO); + DBUG_RETURN(!store); } /** Report a missing tablespace for which page-redo log exists. @@ -3945,8 +4271,8 @@ next: /* fall through */ case file_name_t::DELETED: recv_sys_t::map::iterator r = p++; - r->second.log.clear(); - recv_sys.pages.erase(r); + recv_sys.pages_it_invalidate(r); + recv_sys.erase(r); continue; } ut_ad(0); @@ -3970,8 +4296,6 @@ func_exit: continue; } - missing_tablespace = true; - if (srv_force_recovery) { sql_print_warning("InnoDB: Tablespace " UINT32PF " was not found at %.*s," @@ -3991,14 +4315,11 @@ func_exit: rs.first, int(rs.second.name.size()), rs.second.name.data()); + } else { + missing_tablespace = true; } } - if (!rescan || srv_force_recovery > 0) { - missing_tablespace = false; - } - - err = DB_SUCCESS; goto func_exit; } @@ -4232,35 +4553,41 @@ read_only_recovery: goto early_exit; } - /* If there is any missing tablespace and rescan is needed - then there is a possiblity that hash table will not contain - all space ids redo logs. Rescan the remaining unstored - redo logs for the validation of missing tablespace. */ - ut_ad(rescan || !missing_tablespace); + if (missing_tablespace) { + ut_ad(rescan); + /* If any tablespaces seem to be missing, + validate the remaining log records. */ - while (missing_tablespace) { - recv_sys.lsn = recv_sys.last_stored_lsn; - DBUG_PRINT("ib_log", ("Rescan of redo log to validate " - "the missing tablespace. Scan " - "from last stored LSN " LSN_PF, - recv_sys.lsn)); - rescan = recv_scan_log(false); - ut_ad(!recv_sys.is_corrupt_fs()); + do { + rescan = recv_scan_log(false); + ut_ad(!recv_sys.is_corrupt_fs()); - missing_tablespace = false; + if (recv_sys.is_corrupt_log()) { + goto err_exit; + } - if (recv_sys.is_corrupt_log()) { - goto err_exit; - } + missing_tablespace = false; - err = recv_validate_tablespace( - rescan, missing_tablespace); + err = recv_validate_tablespace( + rescan, missing_tablespace); - if (err != DB_SUCCESS) { - goto early_exit; - } + if (err != DB_SUCCESS) { + goto early_exit; + } + } while (missing_tablespace); rescan = true; + /* Because in the loop above we overwrote the + initially stored recv_sys.pages, we must + restart parsing the log from the very beginning. */ + + /* FIXME: Use a separate loop for checking for + tablespaces (not individual pages), while retaining + the initial recv_sys.pages. */ + mysql_mutex_lock(&recv_sys.mutex); + recv_sys.clear(); + recv_sys.lsn = log_sys.next_checkpoint_lsn; + mysql_mutex_unlock(&recv_sys.mutex); } if (srv_operation <= SRV_OPERATION_EXPORT_RESTORED) { @@ -4271,8 +4598,7 @@ read_only_recovery: ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN); if (rescan) { - recv_sys.lsn = log_sys.next_checkpoint_lsn; - rescan = recv_scan_log(true); + recv_scan_log(true); if ((recv_sys.is_corrupt_log() && !srv_force_recovery) || recv_sys.is_corrupt_fs()) { diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index aafa4361b0b..217cf153b59 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -3411,15 +3411,12 @@ os_file_get_status( return(ret); } - -extern void fil_aio_callback(const IORequest &request); - -static void io_callback(tpool::aiocb *cb) +static void io_callback_errorcheck(const tpool::aiocb *cb) { - const IORequest &request= *static_cast<const IORequest*> - (static_cast<const void*>(cb->m_userdata)); if (cb->m_err != DB_SUCCESS) { + const IORequest &request= *static_cast<const IORequest*> + (static_cast<const void*>(cb->m_userdata)); ib::fatal() << "IO Error: " << cb->m_err << " during " << (request.is_async() ? "async " : "sync ") << (request.is_LRU() ? "lru " : "") << @@ -3427,19 +3424,36 @@ static void io_callback(tpool::aiocb *cb) " of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " << cb->m_ret_len; } - /* Return cb back to cache*/ - if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) - { - ut_ad(read_slots->contains(cb)); - fil_aio_callback(request); - read_slots->release(cb); - } - else - { - ut_ad(write_slots->contains(cb)); - fil_aio_callback(request); - write_slots->release(cb); - } +} + +static void fake_io_callback(void *c) +{ + tpool::aiocb *cb= static_cast<tpool::aiocb*>(c); + ut_ad(read_slots->contains(cb)); + static_cast<const IORequest*>(static_cast<const void*>(cb->m_userdata))-> + fake_read_complete(cb->m_offset); + read_slots->release(cb); +} + +static void read_io_callback(void *c) +{ + tpool::aiocb *cb= static_cast<tpool::aiocb*>(c); + ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PREAD); + io_callback_errorcheck(cb); + ut_ad(read_slots->contains(cb)); + static_cast<const IORequest*> + (static_cast<const void*>(cb->m_userdata))->read_complete(); + read_slots->release(cb); +} + +static void write_io_callback(void *c) +{ + tpool::aiocb *cb= static_cast<tpool::aiocb*>(c); + ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PWRITE); + ut_ad(write_slots->contains(cb)); + static_cast<const IORequest*> + (static_cast<const void*>(cb->m_userdata))->write_complete(); + write_slots->release(cb); } #ifdef LINUX_NATIVE_AIO @@ -3684,6 +3698,28 @@ void os_aio_wait_until_no_pending_reads(bool declare) tpool::tpool_wait_end(); } +/** Submit a fake read request during crash recovery. +@param type fake read request +@param offset additional context */ +void os_fake_read(const IORequest &type, os_offset_t offset) +{ + tpool::aiocb *cb= read_slots->acquire(); + + cb->m_group= read_slots->get_task_group(); + cb->m_fh= type.node->handle.m_file; + cb->m_buffer= nullptr; + cb->m_len= 0; + cb->m_offset= offset; + cb->m_opcode= tpool::aio_opcode::AIO_PREAD; + new (cb->m_userdata) IORequest{type}; + cb->m_internal_task.m_func= fake_io_callback; + cb->m_internal_task.m_arg= cb; + cb->m_internal_task.m_group= cb->m_group; + + srv_thread_pool->submit_task(&cb->m_internal_task); +} + + /** Request a read or write. @param type I/O request @param buf buffer @@ -3729,23 +3765,32 @@ func_exit: return err; } + io_slots* slots; + tpool::callback_func callback; + tpool::aio_opcode opcode; + if (type.is_read()) { ++os_n_file_reads; + slots = read_slots; + callback = read_io_callback; + opcode = tpool::aio_opcode::AIO_PREAD; } else { ++os_n_file_writes; + slots = write_slots; + callback = write_io_callback; + opcode = tpool::aio_opcode::AIO_PWRITE; } compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN); - io_slots* slots= type.is_read() ? read_slots : write_slots; tpool::aiocb* cb = slots->acquire(); cb->m_buffer = buf; - cb->m_callback = (tpool::callback_func)io_callback; + cb->m_callback = callback; cb->m_group = slots->get_task_group(); cb->m_fh = type.node->handle.m_file; cb->m_len = (int)n; cb->m_offset = offset; - cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE; + cb->m_opcode = opcode; new (cb->m_userdata) IORequest{type}; if (srv_thread_pool->submit_io(cb)) { @@ -3753,6 +3798,7 @@ func_exit: os_file_handle_error(type.node->name, type.is_read() ? "aio read" : "aio write"); err = DB_IO_ERROR; + type.node->space->release(); } goto func_exit; |