diff options
Diffstat (limited to 'storage/innobase/buf/buf0flu.cc')
-rw-r--r-- | storage/innobase/buf/buf0flu.cc | 134 |
1 files changed, 84 insertions, 50 deletions
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 841a5eb46ae..d7343fbd9ed 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -31,7 +31,6 @@ Created 11/11/1995 Heikki Tuuri #include "buf0flu.h" #include "buf0buf.h" -#include "buf0mtflu.h" #include "buf0checksum.h" #include "srv0start.h" #include "srv0srv.h" @@ -151,6 +150,8 @@ struct page_cleaner_t { threads. */ os_event_t is_finished; /*!< event to signal that all slots were finished. */ + os_event_t is_started; /*!< event to signal that + thread is started/exiting */ volatile ulint n_workers; /*!< number of worker threads in existence */ bool requested; /*!< true if requested pages @@ -899,7 +900,7 @@ buf_flush_init_for_writing( /* Write the newest modification lsn to the page header and trailer */ mach_write_to_8(page + FIL_PAGE_LSN, newest_lsn); - mach_write_to_8(page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM, + mach_write_to_8(page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM, newest_lsn); if (block && srv_page_size == 16384) { @@ -930,6 +931,7 @@ buf_flush_init_for_writing( default: switch (page_type) { case FIL_PAGE_INDEX: + case FIL_PAGE_TYPE_INSTANT: case FIL_PAGE_RTREE: case FIL_PAGE_UNDO_LOG: case FIL_PAGE_INODE: @@ -993,7 +995,7 @@ buf_flush_init_for_writing( new enum is added and not handled here */ } - mach_write_to_4(page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM, + mach_write_to_4(page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM, checksum); } @@ -1018,7 +1020,7 @@ buf_flush_write_block_low( || space->purpose == FIL_TYPE_IMPORT || space->purpose == FIL_TYPE_TABLESPACE); ut_ad((space->purpose == FIL_TYPE_TEMPORARY) - == fsp_is_system_temporary(space->id)); + == (space == fil_system.temp_space)); page_t* frame = NULL; #ifdef UNIV_DEBUG buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); @@ -1128,7 +1130,7 @@ buf_flush_write_block_low( ut_ad(err == DB_SUCCESS); } - fil_space_release_for_io(space); + space->release_for_io(); /* Increment the counter of I/O operations used for selecting LRU policy. */ @@ -1848,6 +1850,7 @@ not guaranteed that the actual number is that big, though) @param[in] lsn_limit in the case of BUF_FLUSH_LIST all blocks whose oldest_modification is smaller than this should be flushed (if their number does not exceed min_n), otherwise ignored */ +static void buf_flush_batch( buf_pool_t* buf_pool, @@ -1887,6 +1890,7 @@ Gather the aggregated stats for both flush list and LRU list flushing. @param page_count_flush number of pages flushed from the end of the flush_list @param page_count_LRU number of pages flushed from the end of the LRU list */ +static void buf_flush_stats( /*============*/ @@ -1903,6 +1907,7 @@ buf_flush_stats( /******************************************************************//** Start a buffer flush batch for LRU or flush list */ +static ibool buf_flush_start( /*============*/ @@ -1934,22 +1939,8 @@ buf_flush_start( } /******************************************************************//** -Gather the aggregated stats for both flush list and LRU list flushing */ -void -buf_flush_common( -/*=============*/ - buf_flush_t flush_type, /*!< in: type of flush */ - ulint page_count) /*!< in: number of pages flushed */ -{ - buf_dblwr_flush_buffered_writes(); - - ut_a(flush_type == BUF_FLUSH_LRU || flush_type == BUF_FLUSH_LIST); - - srv_stats.buf_pool_flushed.add(page_count); -} - -/******************************************************************//** End a buffer flush batch for LRU or flush list */ +static void buf_flush_end( /*==========*/ @@ -2121,10 +2112,6 @@ buf_flush_lists( ulint n_flushed = 0; bool success = true; - if (buf_mtflu_init_done()) { - return(buf_mtflu_flush_list(min_n, lsn_limit, n_processed)); - } - if (n_processed) { *n_processed = 0; } @@ -2283,11 +2270,6 @@ buf_flush_LRU_list( memset(&n, 0, sizeof(flush_counters_t)); - if(buf_mtflu_init_done()) - { - return(buf_mtflu_flush_LRU_tail()); - } - ut_ad(buf_pool); /* srv_LRU_scan_depth can be arbitrarily large value. We cap it with current LRU size. */ @@ -2449,7 +2431,7 @@ page_cleaner_flush_pages_recommendation( cur_lsn = log_get_lsn_nowait(); - /* log_get_lsn_nowait tries to get log_sys->mutex with + /* log_get_lsn_nowait tries to get log_sys.mutex with mutex_enter_nowait, if this does not succeed function returns 0, do not use that value to update stats. */ if (cur_lsn == 0) { @@ -2713,7 +2695,7 @@ buf_flush_page_cleaner_init(void) page_cleaner.is_requested = os_event_create("pc_is_requested"); page_cleaner.is_finished = os_event_create("pc_is_finished"); - + page_cleaner.is_started = os_event_create("pc_is_started"); page_cleaner.n_slots = static_cast<ulint>(srv_buf_pool_instances); ut_d(page_cleaner.n_disabled_debug = 0); @@ -2788,8 +2770,8 @@ pc_flush_slot(void) { ulint lru_tm = 0; ulint list_tm = 0; - int lru_pass = 0; - int list_pass = 0; + ulint lru_pass = 0; + ulint list_pass = 0; mutex_enter(&page_cleaner.mutex); @@ -2993,17 +2975,10 @@ buf_flush_page_cleaner_disabled_loop(void) } /** Disables page cleaner threads (coordinator and workers). -It's used by: SET GLOBAL innodb_page_cleaner_disabled_debug = 1 (0). -@param[in] thd thread handle -@param[in] var pointer to system variable -@param[out] var_ptr where the formal string goes @param[in] save immediate result from check function */ -void -buf_flush_page_cleaner_disabled_debug_update( - THD* thd, - struct st_mysql_sys_var* var, - void* var_ptr, - const void* save) +void buf_flush_page_cleaner_disabled_debug_update(THD*, + st_mysql_sys_var*, void*, + const void* save) { if (!page_cleaner.is_running) { return; @@ -3433,6 +3408,7 @@ thread_exit: os_event_destroy(page_cleaner.is_finished); os_event_destroy(page_cleaner.is_requested); + os_event_destroy(page_cleaner.is_started); buf_page_cleaner_is_active = false; @@ -3444,6 +3420,35 @@ thread_exit: OS_THREAD_DUMMY_RETURN; } +/** Adjust thread count for page cleaner workers. +@param[in] new_cnt Number of threads to be used */ +void +buf_flush_set_page_cleaner_thread_cnt(ulong new_cnt) +{ + mutex_enter(&page_cleaner.mutex); + + srv_n_page_cleaners = new_cnt; + if (new_cnt > page_cleaner.n_workers) { + /* User has increased the number of page + cleaner threads. */ + ulint add = new_cnt - page_cleaner.n_workers; + for (ulint i = 0; i < add; i++) { + os_thread_id_t cleaner_thread_id; + os_thread_create(buf_flush_page_cleaner_worker, NULL, &cleaner_thread_id); + } + } + + mutex_exit(&page_cleaner.mutex); + + /* Wait until defined number of workers has started. */ + while (page_cleaner.is_running && + page_cleaner.n_workers != (srv_n_page_cleaners - 1)) { + os_event_set(page_cleaner.is_requested); + os_event_reset(page_cleaner.is_started); + os_event_wait_time(page_cleaner.is_started, 1000000); + } +} + /******************************************************************//** Worker thread of page_cleaner. @return a dummy parameter */ @@ -3456,9 +3461,18 @@ DECLARE_THREAD(buf_flush_page_cleaner_worker)( os_thread_create */ { my_thread_init(); +#ifndef DBUG_OFF + os_thread_id_t cleaner_thread_id = os_thread_get_curr_id(); +#endif mutex_enter(&page_cleaner.mutex); - page_cleaner.n_workers++; + ulint thread_no = page_cleaner.n_workers++; + + DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id + << " started; n_workers=" << page_cleaner.n_workers); + + /* Signal that we have started */ + os_event_set(page_cleaner.is_started); mutex_exit(&page_cleaner.mutex); #ifdef UNIV_LINUX @@ -3481,11 +3495,31 @@ DECLARE_THREAD(buf_flush_page_cleaner_worker)( break; } + ut_ad(srv_n_page_cleaners >= 1); + + /* If number of page cleaner threads is decreased + exit those that are not anymore needed. */ + if (srv_shutdown_state == SRV_SHUTDOWN_NONE && + thread_no >= (srv_n_page_cleaners - 1)) { + DBUG_LOG("ib_buf", "Exiting " + << thread_no + << " page cleaner worker thread_id " + << os_thread_pf(cleaner_thread_id) + << " total threads " << srv_n_page_cleaners << "."); + break; + } + pc_flush_slot(); } mutex_enter(&page_cleaner.mutex); page_cleaner.n_workers--; + + DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id + << " exiting; n_workers=" << page_cleaner.n_workers); + + /* Signal that we have stopped */ + os_event_set(page_cleaner.is_started); mutex_exit(&page_cleaner.mutex); my_thread_end(); @@ -3690,17 +3724,17 @@ buf_flush_get_dirty_pages_count( } /** FlushObserver constructor -@param[in] space_id table space id +@param[in] space tablespace @param[in] trx trx instance @param[in] stage performance schema accounting object, used by ALTER TABLE. It is passed to log_preflush_pool_modified_pages() for accounting. */ FlushObserver::FlushObserver( - ulint space_id, + fil_space_t* space, trx_t* trx, ut_stage_alter_t* stage) : - m_space_id(space_id), + m_space(space), m_trx(trx), m_stage(stage), m_interrupted(false) @@ -3719,7 +3753,7 @@ FlushObserver::FlushObserver( /** FlushObserver deconstructor */ FlushObserver::~FlushObserver() { - ut_ad(buf_flush_get_dirty_pages_count(m_space_id, this) == 0); + ut_ad(buf_flush_get_dirty_pages_count(m_space->id, this) == 0); UT_DELETE(m_flushed); UT_DELETE(m_removed); @@ -3777,10 +3811,10 @@ FlushObserver::flush() if (!m_interrupted && m_stage) { m_stage->begin_phase_flush(buf_flush_get_dirty_pages_count( - m_space_id, this)); + m_space->id, this)); } - buf_LRU_flush_or_remove_pages(m_space_id, this); + buf_LRU_flush_or_remove_pages(m_space->id, this); /* Wait for all dirty pages were flushed. */ for (ulint i = 0; i < srv_buf_pool_instances; i++) { |