diff options
Diffstat (limited to 'storage/innobase/buf')
-rw-r--r-- | storage/innobase/buf/buf0buddy.cc | 38 | ||||
-rw-r--r-- | storage/innobase/buf/buf0buf.cc | 232 | ||||
-rw-r--r-- | storage/innobase/buf/buf0checksum.cc | 2 | ||||
-rw-r--r-- | storage/innobase/buf/buf0dblwr.cc | 79 | ||||
-rw-r--r-- | storage/innobase/buf/buf0dump.cc | 53 | ||||
-rw-r--r-- | storage/innobase/buf/buf0flu.cc | 134 | ||||
-rw-r--r-- | storage/innobase/buf/buf0lru.cc | 37 | ||||
-rw-r--r-- | storage/innobase/buf/buf0mtflu.cc | 736 | ||||
-rw-r--r-- | storage/innobase/buf/buf0rea.cc | 6 |
9 files changed, 341 insertions, 976 deletions
diff --git a/storage/innobase/buf/buf0buddy.cc b/storage/innobase/buf/buf0buddy.cc index 0863facad52..7a7c3189add 100644 --- a/storage/innobase/buf/buf0buddy.cc +++ b/storage/innobase/buf/buf0buddy.cc @@ -73,10 +73,6 @@ list. This value is stamped at BUF_BUDDY_STAMP_OFFSET offset */ value by the consumer of the block */ #define BUF_BUDDY_STAMP_NONFREE 0XFFFFFFFFUL -#if BUF_BUDDY_STAMP_FREE >= BUF_BUDDY_STAMP_NONFREE -# error "BUF_BUDDY_STAMP_FREE >= BUF_BUDDY_STAMP_NONFREE" -#endif - /** Return type of buf_buddy_is_free() */ enum buf_buddy_state_t { BUF_BUDDY_STATE_FREE, /*!< If the buddy to completely free */ @@ -114,6 +110,7 @@ buf_buddy_stamp_is_free( /*====================*/ const buf_buddy_free_t* buf) /*!< in: block to check */ { + compile_time_assert(BUF_BUDDY_STAMP_FREE < BUF_BUDDY_STAMP_NONFREE); return(mach_read_from_4(buf->stamp.bytes + BUF_BUDDY_STAMP_OFFSET) == BUF_BUDDY_STAMP_FREE); } @@ -138,13 +135,12 @@ buf_buddy_stamp_free( Stamps a buddy nonfree. @param[in,out] buf block to stamp @param[in] i block size */ -#define buf_buddy_stamp_nonfree(buf, i) do { \ - buf_buddy_mem_invalid(buf, i); \ - memset(buf->stamp.bytes + BUF_BUDDY_STAMP_OFFSET, 0xff, 4); \ -} while (0) -#if BUF_BUDDY_STAMP_NONFREE != 0xffffffff -# error "BUF_BUDDY_STAMP_NONFREE != 0xffffffff" -#endif +static inline void buf_buddy_stamp_nonfree(buf_buddy_free_t* buf, ulint i) +{ + buf_buddy_mem_invalid(buf, i); + compile_time_assert(BUF_BUDDY_STAMP_NONFREE == 0xffffffffU); + memset(buf->stamp.bytes + BUF_BUDDY_STAMP_OFFSET, 0xff, 4); +} /**********************************************************************//** Get the offset of the buddy of a compressed page frame. @@ -160,7 +156,7 @@ buf_buddy_get( ut_ad(size >= BUF_BUDDY_LOW); ut_ad(BUF_BUDDY_LOW <= UNIV_ZIP_SIZE_MIN); ut_ad(size < BUF_BUDDY_HIGH); - ut_ad(BUF_BUDDY_HIGH == UNIV_PAGE_SIZE); + ut_ad(BUF_BUDDY_HIGH == srv_page_size); ut_ad(!ut_align_offset(page, size)); if (((ulint) page) & size) { @@ -375,7 +371,7 @@ buf_buddy_alloc_zip( } /**********************************************************************//** -Deallocate a buffer frame of UNIV_PAGE_SIZE. */ +Deallocate a buffer frame of srv_page_size. */ static void buf_buddy_block_free( @@ -389,7 +385,7 @@ buf_buddy_block_free( ut_ad(buf_pool_mutex_own(buf_pool)); ut_ad(!mutex_own(&buf_pool->zip_mutex)); - ut_a(!ut_align_offset(buf, UNIV_PAGE_SIZE)); + ut_a(!ut_align_offset(buf, srv_page_size)); HASH_SEARCH(hash, buf_pool->zip_hash, fold, buf_page_t*, bpage, ut_ad(buf_page_get_state(bpage) == BUF_BLOCK_MEMORY @@ -402,8 +398,8 @@ buf_buddy_block_free( ut_d(bpage->in_zip_hash = FALSE); HASH_DELETE(buf_page_t, hash, buf_pool->zip_hash, fold, bpage); - ut_d(memset(buf, 0, UNIV_PAGE_SIZE)); - UNIV_MEM_INVALID(buf, UNIV_PAGE_SIZE); + ut_d(memset(buf, 0, srv_page_size)); + UNIV_MEM_INVALID(buf, srv_page_size); block = (buf_block_t*) bpage; buf_page_mutex_enter(block); @@ -431,7 +427,7 @@ buf_buddy_block_register( buf_block_set_state(block, BUF_BLOCK_MEMORY); ut_a(block->frame); - ut_a(!ut_align_offset(block->frame, UNIV_PAGE_SIZE)); + ut_a(!ut_align_offset(block->frame, srv_page_size)); ut_ad(!block->page.in_page_hash); ut_ad(!block->page.in_zip_hash); @@ -488,8 +484,8 @@ buf_buddy_alloc_low( buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */ ulint i, /*!< in: index of buf_pool->zip_free[], or BUF_BUDDY_SIZES */ - ibool* lru) /*!< in: pointer to a variable that - will be assigned TRUE if storage was + bool* lru) /*!< in: pointer to a variable that + will be assigned true if storage was allocated from the LRU list and buf_pool->mutex was temporarily released */ @@ -520,7 +516,7 @@ buf_buddy_alloc_low( /* Try replacing an uncompressed page in the buffer pool. */ buf_pool_mutex_exit(buf_pool); block = buf_LRU_get_free_block(buf_pool); - *lru = TRUE; + *lru = true; buf_pool_mutex_enter(buf_pool); alloc_big: @@ -763,7 +759,7 @@ func_exit: @param[in] buf_pool buffer pool instance @param[in] buf block to be reallocated, must be pointed to by the buffer pool -@param[in] size block size, up to UNIV_PAGE_SIZE +@param[in] size block size, up to srv_page_size @retval false if failed because of no free blocks. */ bool buf_buddy_realloc( diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index b5ca51c81dc..6b46cc7097b 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -284,8 +284,8 @@ reachable via buf_pool->chunks[]. The chains of free memory blocks (buf_pool->zip_free[]) are used by the buddy allocator (buf0buddy.cc) to keep track of currently unused -memory blocks of size sizeof(buf_page_t)..UNIV_PAGE_SIZE / 2. These -blocks are inside the UNIV_PAGE_SIZE-sized memory blocks of type +memory blocks of size sizeof(buf_page_t)..srv_page_size / 2. These +blocks are inside the srv_page_size-sized memory blocks of type BUF_BLOCK_MEMORY that the buddy allocator requests from the buffer pool. The buddy allocator is solely used for allocating control blocks for compressed pages (buf_page_t) and compressed page frames. @@ -522,7 +522,7 @@ static bool buf_tmp_page_decrypt(byte* tmp_frame, byte* src_frame) @return whether the operation was successful */ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space) { - ut_ad(space->n_pending_ios > 0); + ut_ad(space->pending_io()); ut_ad(space->id == bpage->id.space()); byte* dst_frame = bpage->zip.data ? bpage->zip.data : @@ -572,7 +572,7 @@ decompress_with_slot: slot->release(); ut_ad(!bpage->write_size || fil_page_type_validate(dst_frame)); - ut_ad(space->n_pending_ios > 0); + ut_ad(space->pending_io()); return bpage->write_size != 0; } @@ -615,13 +615,10 @@ decrypt_failed: goto decompress; } - ut_ad(space->n_pending_ios > 0); + ut_ad(space->pending_io()); return true; } -/* prototypes for new functions added to ha_innodb.cc */ -trx_t* innobase_get_trx(); - /********************************************************************//** Gets the smallest oldest_modification lsn for any page in the pool. Returns zero if all modified pages have been flushed to disk. @@ -721,7 +718,8 @@ buf_get_total_list_size_in_bytes( for statistics purpose */ buf_pools_list_size->LRU_bytes += buf_pool->stat.LRU_bytes; buf_pools_list_size->unzip_LRU_bytes += - UT_LIST_GET_LEN(buf_pool->unzip_LRU) * UNIV_PAGE_SIZE; + UT_LIST_GET_LEN(buf_pool->unzip_LRU) + << srv_page_size_shift; buf_pools_list_size->flush_list_bytes += buf_pool->stat.flush_list_bytes; } @@ -1097,9 +1095,7 @@ buf_page_is_corrupted( checksum_field2 = mach_read_from_4( read_buf + page_size.logical() - FIL_PAGE_END_LSN_OLD_CHKSUM); -#if FIL_PAGE_LSN % 8 -#error "FIL_PAGE_LSN must be 64 bit aligned" -#endif + compile_time_assert(!(FIL_PAGE_LSN % 8)); /* A page filled with NUL bytes is considered not corrupted. The FIL_PAGE_FILE_FLUSH_LSN field may be written nonzero for @@ -1257,6 +1253,56 @@ buf_page_is_corrupted( } #ifndef UNIV_INNOCHECKSUM + +#if defined(DBUG_OFF) && defined(HAVE_MADVISE) && defined(MADV_DODUMP) +/** Enable buffers to be dumped to core files + +A convience function, not called anyhwere directly however +it is left available for gdb or any debugger to call +in the event that you want all of the memory to be dumped +to a core file. + +Returns number of errors found in madvise calls. */ +int +buf_madvise_do_dump() +{ + int ret= 0; + buf_pool_t* buf_pool; + buf_chunk_t* chunk; + + /* mirrors allocation in log_t::create() */ + if (log_sys.buf) { + ret+= madvise(log_sys.first_in_use + ? log_sys.buf + : log_sys.buf - srv_log_buffer_size, + srv_log_buffer_size * 2, + MADV_DODUMP); + } + /* mirrors recv_sys_init() */ + if (recv_sys->buf) + { + ret+= madvise(recv_sys->buf, recv_sys->len, MADV_DODUMP); + } + + buf_pool_mutex_enter_all(); + + for (ulong i= 0; i < srv_buf_pool_instances; i++) + { + buf_pool = buf_pool_from_array(i); + chunk = buf_pool->chunks; + + for (int n = buf_pool->n_chunks; n--; chunk++) + { + ret+= madvise(chunk->mem, chunk->mem_size(), MADV_DODUMP); + } + } + + buf_pool_mutex_exit_all(); + + return ret; +} +#endif + /** Dump a page to stderr. @param[in] read_buf database page @param[in] page_size page size */ @@ -1378,20 +1424,10 @@ buf_page_print(const byte* read_buf, const page_size_t& page_size) read_buf + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID); } - if (mach_read_from_2(read_buf + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE) - == TRX_UNDO_INSERT) { - fprintf(stderr, - "InnoDB: Page may be an insert undo log page\n"); - } else if (mach_read_from_2(read_buf + TRX_UNDO_PAGE_HDR - + TRX_UNDO_PAGE_TYPE) - == TRX_UNDO_UPDATE) { - fprintf(stderr, - "InnoDB: Page may be an update undo log page\n"); - } - switch (fil_page_get_type(read_buf)) { index_id_t index_id; case FIL_PAGE_INDEX: + case FIL_PAGE_TYPE_INSTANT: case FIL_PAGE_RTREE: index_id = btr_page_get_index_id(read_buf); ib::info() << "Page may be an index page where" @@ -1405,6 +1441,9 @@ buf_page_print(const byte* read_buf, const page_size_t& page_size) << " in table " << index->table->name; } break; + case FIL_PAGE_UNDO_LOG: + fputs("InnoDB: Page may be an undo log page\n", stderr); + break; case FIL_PAGE_INODE: fputs("InnoDB: Page may be an 'inode' page\n", stderr); break; @@ -1513,7 +1552,7 @@ buf_block_init( buf_block_t* block, /*!< in: pointer to control block */ byte* frame) /*!< in: pointer to buffer frame */ { - UNIV_MEM_DESC(frame, UNIV_PAGE_SIZE); + UNIV_MEM_DESC(frame, srv_page_size); /* This function should only be executed at database startup or by buf_pool_resize(). Either way, adaptive hash index must not exist. */ @@ -1595,15 +1634,16 @@ buf_chunk_init( /* Round down to a multiple of page size, although it already should be. */ - mem_size = ut_2pow_round(mem_size, UNIV_PAGE_SIZE); + mem_size = ut_2pow_round<ulint>(mem_size, srv_page_size); /* Reserve space for the block descriptors. */ - mem_size += ut_2pow_round((mem_size / UNIV_PAGE_SIZE) * (sizeof *block) - + (UNIV_PAGE_SIZE - 1), UNIV_PAGE_SIZE); + mem_size += ut_2pow_round<ulint>((mem_size >> srv_page_size_shift) + * (sizeof *block) + + (srv_page_size - 1), + srv_page_size); DBUG_EXECUTE_IF("ib_buf_chunk_init_fails", return(NULL);); - chunk->mem = buf_pool->allocator.allocate_large(mem_size, - &chunk->mem_pfx); + chunk->mem = buf_pool->allocator.allocate_large_dontdump(mem_size, &chunk->mem_pfx); if (UNIV_UNLIKELY(chunk->mem == NULL)) { @@ -1632,12 +1672,12 @@ buf_chunk_init( chunk->blocks = (buf_block_t*) chunk->mem; /* Align a pointer to the first frame. Note that when - opt_large_page_size is smaller than UNIV_PAGE_SIZE, + opt_large_page_size is smaller than srv_page_size, we may allocate one fewer block than requested. When it is bigger, we may allocate more blocks than requested. */ - frame = (byte*) ut_align(chunk->mem, UNIV_PAGE_SIZE); - chunk->size = chunk->mem_pfx.m_size / UNIV_PAGE_SIZE + frame = (byte*) ut_align(chunk->mem, srv_page_size); + chunk->size = (chunk->mem_pfx.m_size >> srv_page_size_shift) - (frame != chunk->mem); /* Subtract the space needed for block descriptors. */ @@ -1645,7 +1685,7 @@ buf_chunk_init( ulint size = chunk->size; while (frame < (byte*) (chunk->blocks + size)) { - frame += UNIV_PAGE_SIZE; + frame += srv_page_size; size--; } @@ -1661,7 +1701,7 @@ buf_chunk_init( for (i = chunk->size; i--; ) { buf_block_init(buf_pool, block, frame); - UNIV_MEM_INVALID(block->frame, UNIV_PAGE_SIZE); + UNIV_MEM_INVALID(block->frame, srv_page_size); /* Add the block to the free list */ UT_LIST_ADD_LAST(buf_pool->free, &block->page); @@ -1670,7 +1710,7 @@ buf_chunk_init( ut_ad(buf_pool_from_block(block) == buf_pool); block++; - frame += UNIV_PAGE_SIZE; + frame += srv_page_size; } buf_pool_register_chunk(chunk); @@ -1896,8 +1936,8 @@ buf_pool_init_instance( &block->debug_latch)); } - buf_pool->allocator.deallocate_large( - chunk->mem, &chunk->mem_pfx); + buf_pool->allocator.deallocate_large_dodump( + chunk->mem, &chunk->mem_pfx, chunk->mem_size()); } ut_free(buf_pool->chunks); buf_pool_mutex_exit(buf_pool); @@ -1913,7 +1953,8 @@ buf_pool_init_instance( ut_min(BUF_READ_AHEAD_PAGES, ut_2_power_up(buf_pool->curr_size / BUF_READ_AHEAD_PORTION)); - buf_pool->curr_pool_size = buf_pool->curr_size * UNIV_PAGE_SIZE; + buf_pool->curr_pool_size = buf_pool->curr_size + << srv_page_size_shift; buf_pool->old_size = buf_pool->curr_size; buf_pool->n_chunks_new = buf_pool->n_chunks; @@ -2043,8 +2084,8 @@ buf_pool_free_instance( ut_d(rw_lock_free(&block->debug_latch)); } - buf_pool->allocator.deallocate_large( - chunk->mem, &chunk->mem_pfx); + buf_pool->allocator.deallocate_large_dodump( + chunk->mem, &chunk->mem_pfx, chunk->mem_size()); } for (ulint i = BUF_FLUSH_LRU; i < BUF_FLUSH_N_TYPES; ++i) { @@ -2241,7 +2282,7 @@ buf_page_realloc( buf_block_modify_clock_inc(block); memset(block->frame + FIL_PAGE_OFFSET, 0xff, 4); memset(block->frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, 0xff, 4); - UNIV_MEM_INVALID(block->frame, UNIV_PAGE_SIZE); + UNIV_MEM_INVALID(block->frame, srv_page_size); buf_block_set_state(block, BUF_BLOCK_REMOVE_HASH); block->page.id = page_id_t(ULINT32_UNDEFINED, ULINT32_UNDEFINED); @@ -2365,7 +2406,7 @@ buf_frame_will_withdrawn( while (chunk < echunk) { if (ptr >= chunk->blocks->frame && ptr < (chunk->blocks + chunk->size - 1)->frame - + UNIV_PAGE_SIZE) { + + srv_page_size) { return(true); } ++chunk; @@ -2703,7 +2744,7 @@ buf_pool_resize() ut_ad(srv_buf_pool_chunk_unit > 0); new_instance_size = srv_buf_pool_size / srv_buf_pool_instances; - new_instance_size /= UNIV_PAGE_SIZE; + new_instance_size >>= srv_page_size_shift; buf_resize_status("Resizing buffer pool from " ULINTPF " to " ULINTPF " (unit=" ULINTPF ").", @@ -2722,7 +2763,8 @@ buf_pool_resize() buf_pool->curr_size = new_instance_size; - buf_pool->n_chunks_new = new_instance_size * UNIV_PAGE_SIZE + buf_pool->n_chunks_new = + (new_instance_size << srv_page_size_shift) / srv_buf_pool_chunk_unit; buf_pool_mutex_exit(buf_pool); @@ -2810,11 +2852,11 @@ withdraw_retry: } lock_mutex_enter(); - trx_sys_mutex_enter(); + mutex_enter(&trx_sys.mutex); bool found = false; - for (trx_t* trx = UT_LIST_GET_FIRST(trx_sys->mysql_trx_list); + for (trx_t* trx = UT_LIST_GET_FIRST(trx_sys.trx_list); trx != NULL; - trx = UT_LIST_GET_NEXT(mysql_trx_list, trx)) { + trx = UT_LIST_GET_NEXT(trx_list, trx)) { if (trx->state != TRX_STATE_NOT_STARTED && trx->mysql_thd != NULL && withdraw_started > trx->start_time) { @@ -2833,7 +2875,7 @@ withdraw_retry: stderr, trx, current_time); } } - trx_sys_mutex_exit(); + mutex_exit(&trx_sys.mutex); lock_mutex_exit(); withdraw_started = current_time; @@ -2921,8 +2963,8 @@ withdraw_retry: &block->debug_latch)); } - buf_pool->allocator.deallocate_large( - chunk->mem, &chunk->mem_pfx); + buf_pool->allocator.deallocate_large_dodump( + chunk->mem, &chunk->mem_pfx, chunk->mem_size()); sum_freed += chunk->size; @@ -3059,7 +3101,7 @@ calc_buf_pool_size: ut_2_power_up(buf_pool->curr_size / BUF_READ_AHEAD_PORTION)); buf_pool->curr_pool_size - = buf_pool->curr_size * UNIV_PAGE_SIZE; + = buf_pool->curr_size << srv_page_size_shift; curr_size += buf_pool->curr_pool_size; buf_pool->old_size = buf_pool->curr_size; } @@ -3111,8 +3153,9 @@ calc_buf_pool_size: buf_resize_status("Resizing also other hash tables."); /* normalize lock_sys */ - srv_lock_table_size = 5 * (srv_buf_pool_size / UNIV_PAGE_SIZE); - lock_sys_resize(srv_lock_table_size); + srv_lock_table_size = 5 + * (srv_buf_pool_size >> srv_page_size_shift); + lock_sys.resize(srv_lock_table_size); /* normalize btr_search_sys */ btr_search_sys_resize( @@ -4002,7 +4045,7 @@ buf_zip_decompress( if (page_zip_decompress(&block->page.zip, block->frame, TRUE)) { if (space) { - fil_space_release_for_io(space); + space->release_for_io(); } return(TRUE); } @@ -4021,7 +4064,7 @@ buf_zip_decompress( /* Copy to uncompressed storage. */ memcpy(block->frame, frame, block->page.size.physical()); if (space) { - fil_space_release_for_io(space); + space->release_for_io(); } return(TRUE); @@ -4036,13 +4079,16 @@ err_exit: if (encrypted) { ib::info() << "Row compressed page could be encrypted" " with key_version " << key_version; - dict_set_encrypted_by_space(block->page.id.space()); - } else { - dict_set_corrupted_by_space(block->page.id.space()); } if (space) { - fil_space_release_for_io(space); + if (encrypted) { + dict_set_encrypted_by_space(space); + } else { + dict_set_corrupted_by_space(space); + } + + space->release_for_io(); } return(FALSE); @@ -4073,16 +4119,16 @@ buf_block_from_ahi(const byte* ptr) chunk = (--it)->second; } - ulint offs = ptr - chunk->blocks->frame; + ulint offs = ulint(ptr - chunk->blocks->frame); - offs >>= UNIV_PAGE_SIZE_SHIFT; + offs >>= srv_page_size_shift; ut_a(offs < chunk->size); buf_block_t* block = &chunk->blocks[offs]; /* The function buf_chunk_init() invokes buf_block_init() so that - block[n].frame == block->frame + n * UNIV_PAGE_SIZE. Check it. */ + block[n].frame == block->frame + n * srv_page_size. Check it. */ ut_ad(block->frame == page_align(ptr)); /* Read the state of the block without holding a mutex. A state transition from BUF_BLOCK_FILE_PAGE to @@ -4451,9 +4497,16 @@ loop: /* Try to set table as corrupted instead of asserting. */ - if (page_id.space() != TRX_SYS_SPACE && - dict_set_corrupted_by_space(page_id.space())) { - return (NULL); + if (page_id.space() == TRX_SYS_SPACE) { + } else if (page_id.space() == SRV_TMP_SPACE_ID) { + } else if (fil_space_t* space + = fil_space_acquire_for_io( + page_id.space())) { + bool set = dict_set_corrupted_by_space(space); + space->release_for_io(); + if (set) { + return NULL; + } } ib::fatal() << "Unable to read page " << page_id @@ -4466,9 +4519,7 @@ loop: } #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG - ut_a(fsp_skip_sanity_check(page_id.space()) - || ++buf_dbg_counter % 5771 - || buf_validate()); + ut_a(++buf_dbg_counter % 5771 || buf_validate()); #endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */ goto loop; } else { @@ -4856,9 +4907,7 @@ evict_from_pool: } #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG - ut_a(fsp_skip_sanity_check(page_id.space()) - || ++buf_dbg_counter % 5771 - || buf_validate()); + ut_a(++buf_dbg_counter % 5771 || buf_validate()); ut_a(buf_block_get_state(fix_block) == BUF_BLOCK_FILE_PAGE); #endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */ @@ -5012,9 +5061,7 @@ buf_page_optimistic_get( mtr_memo_push(mtr, block, fix_type); #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG - ut_a(fsp_skip_sanity_check(block->page.id.space()) - || ++buf_dbg_counter % 5771 - || buf_validate()); + ut_a(++buf_dbg_counter % 5771 || buf_validate()); ut_a(block->page.buf_fix_count > 0); ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE); #endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */ @@ -5204,9 +5251,7 @@ buf_page_try_get_func( mtr_memo_push(mtr, block, fix_type); #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG - ut_a(fsp_skip_sanity_check(block->page.id.space()) - || ++buf_dbg_counter % 5771 - || buf_validate()); + ut_a(++buf_dbg_counter % 5771 || buf_validate()); ut_a(block->page.buf_fix_count > 0); ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE); #endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */ @@ -5278,7 +5323,7 @@ buf_page_init( /* Silence valid Valgrind warnings about uninitialized data being written to data files. There are some unused bytes on some pages that InnoDB does not initialize. */ - UNIV_MEM_VALID(block->frame, UNIV_PAGE_SIZE); + UNIV_MEM_VALID(block->frame, srv_page_size); } #endif /* UNIV_DEBUG_VALGRIND */ @@ -5362,7 +5407,7 @@ buf_page_init_for_read( buf_page_t* watch_page; rw_lock_t* hash_lock; mtr_t mtr; - ibool lru = FALSE; + bool lru = false; void* data; buf_pool_t* buf_pool = buf_pool_get(page_id); @@ -5656,7 +5701,7 @@ buf_page_create( if (page_size.is_compressed()) { void* data; - ibool lru; + bool lru; /* Prevent race conditions during buf_buddy_alloc(), which may release and reacquire buf_pool->mutex, @@ -5750,13 +5795,14 @@ buf_page_monitor( switch (fil_page_get_type(frame)) { ulint level; - + case FIL_PAGE_TYPE_INSTANT: case FIL_PAGE_INDEX: case FIL_PAGE_RTREE: - level = btr_page_get_level_low(frame); + level = btr_page_get_level(frame); /* Check if it is an index page for insert buffer */ - if (btr_page_get_index_id(frame) + if (fil_page_get_type(frame) == FIL_PAGE_INDEX + && btr_page_get_index_id(frame) == (index_id_t)(DICT_IBUF_ID_MIN + IBUF_SPACE_ID)) { if (level == 0) { counter = MONITOR_RW_COUNTER( @@ -5841,9 +5887,9 @@ static void buf_mark_space_corrupt(buf_page_t* bpage, const fil_space_t& space) are marked unusable later e.g. in ::open(). */ if (!space.crypt_data || space.crypt_data->type == CRYPT_SCHEME_UNENCRYPTED) { - dict_set_corrupted_by_space(bpage->id.space()); + dict_set_corrupted_by_space(&space); } else { - dict_set_encrypted_by_space(bpage->id.space()); + dict_set_encrypted_by_space(&space); } } @@ -5907,7 +5953,7 @@ after decryption normal page checksum does not match. @retval DB_TABLESPACE_DELETED if accessed tablespace is not found */ static dberr_t buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space) { - ut_ad(space->n_pending_ios > 0); + ut_ad(space->pending_io()); byte* dst_frame = (bpage->zip.data) ? bpage->zip.data : ((buf_block_t*) bpage)->frame; @@ -6015,7 +6061,7 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict) my_atomic_addlint(&buf_pool->n_pend_unzip, 1); ibool ok = buf_zip_decompress((buf_block_t*) bpage, FALSE); - my_atomic_addlint(&buf_pool->n_pend_unzip, -1); + my_atomic_addlint(&buf_pool->n_pend_unzip, ulint(-1)); if (!ok) { ib::info() << "Page " @@ -6070,7 +6116,7 @@ database_corrupted: buf_corrupt_page_release(bpage, space); ib::info() << "Simulated IMPORT " "corruption"; - fil_space_release_for_io(space); + space->release_for_io(); return(err); } err = DB_SUCCESS; @@ -6112,7 +6158,7 @@ database_corrupted: } buf_corrupt_page_release(bpage, space); - fil_space_release_for_io(space); + space->release_for_io(); return(err); } } @@ -6130,7 +6176,7 @@ database_corrupted: recv_recover_corrupt_page(corrupt_page_id); } - fil_space_release_for_io(space); + space->release_for_io(); return err; } @@ -6153,7 +6199,7 @@ database_corrupted: &bpage->size, TRUE); } - fil_space_release_for_io(space); + space->release_for_io(); } else { /* io_type == BUF_IO_WRITE */ if (bpage->slot) { @@ -7369,7 +7415,7 @@ buf_page_encrypt_before_write( byte* src_frame) { ut_ad(space->id == bpage->id.space()); - bpage->real_size = UNIV_PAGE_SIZE; + bpage->real_size = srv_page_size; fil_page_type_validate(src_frame); @@ -7435,7 +7481,7 @@ not_compressed: src_frame, dst_frame); } - bpage->real_size = UNIV_PAGE_SIZE; + bpage->real_size = srv_page_size; slot->out_buf = dst_frame = tmp; ut_d(fil_page_type_validate(tmp)); diff --git a/storage/innobase/buf/buf0checksum.cc b/storage/innobase/buf/buf0checksum.cc index 9e81b0384c6..70ad5ed600b 100644 --- a/storage/innobase/buf/buf0checksum.cc +++ b/storage/innobase/buf/buf0checksum.cc @@ -112,7 +112,7 @@ buf_calc_page_new_checksum(const byte* page) FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION - FIL_PAGE_OFFSET) + ut_fold_binary(page + FIL_PAGE_DATA, - UNIV_PAGE_SIZE - FIL_PAGE_DATA + srv_page_size - FIL_PAGE_DATA - FIL_PAGE_END_LSN_OLD_CHKSUM); return(static_cast<uint32_t>(checksum)); } diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index 78a7c06a3f0..6e5dee67714 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -148,11 +148,11 @@ buf_dblwr_init( ut_zalloc_nokey(buf_size * sizeof(bool))); buf_dblwr->write_buf_unaligned = static_cast<byte*>( - ut_malloc_nokey((1 + buf_size) * UNIV_PAGE_SIZE)); + ut_malloc_nokey((1 + buf_size) << srv_page_size_shift)); buf_dblwr->write_buf = static_cast<byte*>( ut_align(buf_dblwr->write_buf_unaligned, - UNIV_PAGE_SIZE)); + srv_page_size)); buf_dblwr->buf_block_arr = static_cast<buf_page_t**>( ut_zalloc_nokey(buf_size * sizeof(void*))); @@ -197,17 +197,13 @@ start_again: buf_dblwr_being_created = FALSE; return(true); } else { - fil_space_t* space = fil_space_acquire(TRX_SYS_SPACE); - const bool fail = UT_LIST_GET_FIRST(space->chain)->size - < 3 * FSP_EXTENT_SIZE; - fil_space_release(space); - - if (fail) { + if (UT_LIST_GET_FIRST(fil_system.sys_space->chain)->size + < 3 * FSP_EXTENT_SIZE) { goto too_small; } } - block2 = fseg_create(TRX_SYS_SPACE, TRX_SYS_PAGE_NO, + block2 = fseg_create(fil_system.sys_space, TRX_SYS_PAGE_NO, TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_FSEG, &mtr); @@ -217,7 +213,8 @@ too_small: << "Cannot create doublewrite buffer: " "the first file in innodb_data_file_path" " must be at least " - << (3 * (FSP_EXTENT_SIZE * UNIV_PAGE_SIZE) >> 20) + << (3 * (FSP_EXTENT_SIZE + >> (20U - srv_page_size_shift))) << "M."; mtr.commit(); return(false); @@ -366,10 +363,10 @@ buf_dblwr_init_or_load_pages( /* We do the file i/o past the buffer pool */ unaligned_read_buf = static_cast<byte*>( - ut_malloc_nokey(3 * UNIV_PAGE_SIZE)); + ut_malloc_nokey(3U << srv_page_size_shift)); read_buf = static_cast<byte*>( - ut_align(unaligned_read_buf, UNIV_PAGE_SIZE)); + ut_align(unaligned_read_buf, srv_page_size)); /* Read the trx sys header to check if we are using the doublewrite buffer */ @@ -379,8 +376,8 @@ buf_dblwr_init_or_load_pages( err = os_file_read( read_request, - file, read_buf, TRX_SYS_PAGE_NO * UNIV_PAGE_SIZE, - UNIV_PAGE_SIZE); + file, read_buf, TRX_SYS_PAGE_NO << srv_page_size_shift, + srv_page_size); if (err != DB_SUCCESS) { @@ -428,8 +425,8 @@ buf_dblwr_init_or_load_pages( /* Read the pages from the doublewrite buffer to memory */ err = os_file_read( read_request, - file, buf, block1 * UNIV_PAGE_SIZE, - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE); + file, buf, block1 << srv_page_size_shift, + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift); if (err != DB_SUCCESS) { @@ -445,9 +442,9 @@ buf_dblwr_init_or_load_pages( err = os_file_read( read_request, file, - buf + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE, - block2 * UNIV_PAGE_SIZE, - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE); + buf + (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift), + block2 << srv_page_size_shift, + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift); if (err != DB_SUCCESS) { @@ -487,8 +484,8 @@ buf_dblwr_init_or_load_pages( err = os_file_write( write_request, path, file, page, - source_page_no * UNIV_PAGE_SIZE, - UNIV_PAGE_SIZE); + source_page_no << srv_page_size_shift, + srv_page_size); if (err != DB_SUCCESS) { ib::error() @@ -506,7 +503,7 @@ buf_dblwr_init_or_load_pages( recv_dblwr.add(page); } - page += univ_page_size.physical(); + page += srv_page_size; } if (reset_space_ids) { @@ -532,11 +529,11 @@ buf_dblwr_process() } unaligned_read_buf = static_cast<byte*>( - ut_malloc_nokey(3 * UNIV_PAGE_SIZE)); + ut_malloc_nokey(3U << srv_page_size_shift)); read_buf = static_cast<byte*>( - ut_align(unaligned_read_buf, UNIV_PAGE_SIZE)); - byte* const buf = read_buf + UNIV_PAGE_SIZE; + ut_align(unaligned_read_buf, srv_page_size)); + byte* const buf = read_buf + srv_page_size; for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin(); i != recv_dblwr.pages.end(); @@ -795,14 +792,14 @@ buf_dblwr_check_page_lsn( } if (memcmp(page + (FIL_PAGE_LSN + 4), - page + (UNIV_PAGE_SIZE + page + (srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM + 4), 4)) { const ulint lsn1 = mach_read_from_4( page + FIL_PAGE_LSN + 4); const ulint lsn2 = mach_read_from_4( - page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM + 4); ib::error() << "The page to be written seems corrupt!" @@ -847,6 +844,7 @@ buf_dblwr_check_block( switch (fil_page_get_type(block->frame)) { case FIL_PAGE_INDEX: + case FIL_PAGE_TYPE_INSTANT: case FIL_PAGE_RTREE: if (page_is_comp(block->frame)) { if (page_simple_validate_new(block->frame)) { @@ -879,7 +877,6 @@ buf_dblwr_check_block( case FIL_PAGE_TYPE_ALLOCATED: /* empty pages should never be flushed */ return; - break; } buf_dblwr_assert_on_corrupt_block(block); @@ -1005,7 +1002,7 @@ try_again: for (ulint len2 = 0, i = 0; i < buf_dblwr->first_free; - len2 += UNIV_PAGE_SIZE, i++) { + len2 += srv_page_size, i++) { const buf_block_t* block; @@ -1028,8 +1025,8 @@ try_again: } /* Write out the first block of the doublewrite buffer */ - len = ut_min(TRX_SYS_DOUBLEWRITE_BLOCK_SIZE, - buf_dblwr->first_free) * UNIV_PAGE_SIZE; + len = std::min<ulint>(TRX_SYS_DOUBLEWRITE_BLOCK_SIZE, + buf_dblwr->first_free) << srv_page_size_shift; fil_io(IORequestWrite, true, page_id_t(TRX_SYS_SPACE, buf_dblwr->block1), univ_page_size, @@ -1042,10 +1039,10 @@ try_again: /* Write out the second block of the doublewrite buffer. */ len = (buf_dblwr->first_free - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) - * UNIV_PAGE_SIZE; + << srv_page_size_shift; write_buf = buf_dblwr->write_buf - + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE; + + (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift); fil_io(IORequestWrite, true, page_id_t(TRX_SYS_SPACE, buf_dblwr->block2), univ_page_size, @@ -1127,7 +1124,7 @@ try_again: } byte* p = buf_dblwr->write_buf - + univ_page_size.physical() * buf_dblwr->first_free; + + srv_page_size * buf_dblwr->first_free; /* We request frame here to get correct buffer in case of encryption and/or page compression */ @@ -1140,7 +1137,7 @@ try_again: memcpy(p, frame, bpage->size.physical()); memset(p + bpage->size.physical(), 0x0, - univ_page_size.physical() - bpage->size.physical()); + srv_page_size - bpage->size.physical()); } else { ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE); @@ -1270,20 +1267,20 @@ retry: void * frame = buf_page_get_frame(bpage); if (bpage->size.is_compressed()) { - memcpy(buf_dblwr->write_buf + univ_page_size.physical() * i, + memcpy(buf_dblwr->write_buf + srv_page_size * i, frame, bpage->size.physical()); - memset(buf_dblwr->write_buf + univ_page_size.physical() * i + memset(buf_dblwr->write_buf + srv_page_size * i + bpage->size.physical(), 0x0, - univ_page_size.physical() - bpage->size.physical()); + srv_page_size - bpage->size.physical()); fil_io(IORequestWrite, true, page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0, - univ_page_size.physical(), - (void *)(buf_dblwr->write_buf + univ_page_size.physical() * i), + srv_page_size, + (void *)(buf_dblwr->write_buf + srv_page_size * i), NULL); } else { /* It is a regular page. Write it directly to the @@ -1293,7 +1290,7 @@ retry: page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0, - univ_page_size.physical(), + srv_page_size, (void*) frame, NULL); } diff --git a/storage/innobase/buf/buf0dump.cc b/storage/innobase/buf/buf0dump.cc index 3e01b66eb76..487f044ec18 100644 --- a/storage/innobase/buf/buf0dump.cc +++ b/storage/innobase/buf/buf0dump.cc @@ -438,6 +438,11 @@ buf_dump( buf_dump_status(STATUS_INFO, "Buffer pool(s) dump completed at %s", now); + + /* Though dumping doesn't related to an incomplete load, + we reset this to 0 here to indicate that a shutdown can also perform + a dump */ + export_vars.innodb_buffer_pool_load_incomplete = 0; } /*****************************************************************//** @@ -536,7 +541,7 @@ buf_load() buf_load_status(STATUS_INFO, "Loading buffer pool(s) from %s", full_filename); - f = fopen(full_filename, "r"); + f = fopen(full_filename, "r" STR_O_CLOEXEC); if (f == NULL) { buf_load_status(STATUS_INFO, "Cannot open '%s' for reading: %s", @@ -601,6 +606,8 @@ buf_load() rewind(f); + export_vars.innodb_buffer_pool_load_incomplete = 1; + for (i = 0; i < dump_n && !SHUTTING_DOWN(); i++) { fscanf_ret = fscanf(f, ULINTPF "," ULINTPF, &space_id, &page_no); @@ -649,7 +656,7 @@ buf_load() ut_sprintf_timestamp(now); buf_load_status(STATUS_INFO, "Buffer pool(s) load completed at %s" - " (%s was empty)", now, full_filename); + " (%s was empty or had errors)", now, full_filename); return; } @@ -689,7 +696,7 @@ buf_load() if (this_space_id != cur_space_id) { if (space != NULL) { - fil_space_release(space); + space->release(); } cur_space_id = this_space_id; @@ -721,7 +728,7 @@ buf_load() if (buf_load_abort_flag) { if (space != NULL) { - fil_space_release(space); + space->release(); } buf_load_abort_flag = FALSE; ut_free(dump); @@ -743,18 +750,39 @@ buf_load() buf_load_throttle_if_needed( &last_check_time, &last_activity_cnt, i); + +#ifdef UNIV_DEBUG + if ((i+1) >= srv_buf_pool_load_pages_abort) { + buf_load_abort_flag = 1; + } +#endif } if (space != NULL) { - fil_space_release(space); + space->release(); } ut_free(dump); ut_sprintf_timestamp(now); - buf_load_status(STATUS_INFO, + if (i == dump_n) { + buf_load_status(STATUS_INFO, "Buffer pool(s) load completed at %s", now); + export_vars.innodb_buffer_pool_load_incomplete = 0; + } else if (!buf_load_abort_flag) { + buf_load_status(STATUS_INFO, + "Buffer pool(s) load aborted due to user instigated abort at %s", + now); + /* intentionally don't reset innodb_buffer_pool_load_incomplete + as we don't want a shutdown to save the buffer pool */ + } else { + buf_load_status(STATUS_INFO, + "Buffer pool(s) load aborted due to shutdown at %s", + now); + /* intentionally don't reset innodb_buffer_pool_load_incomplete + as we want to abort without saving the buffer pool */ + } /* Make sure that estimated = completed when we end. */ /* mysql_stage_set_work_completed(pfs_stage_progress, dump_n); */ @@ -823,15 +851,16 @@ DECLARE_THREAD(buf_dump_thread)(void*) } if (srv_buffer_pool_dump_at_shutdown && srv_fast_shutdown != 2) { + if (export_vars.innodb_buffer_pool_load_incomplete) { + buf_dump_status(STATUS_INFO, + "Dumping of buffer pool not started" + " as load was incomplete"); #ifdef WITH_WSREP - if (!wsrep_recovery) { + } else if (wsrep_recovery) { #endif /* WITH_WSREP */ - - buf_dump(FALSE /* ignore shutdown down flag, - keep going even if we are in a shutdown state */); -#ifdef WITH_WSREP + } else { + buf_dump(FALSE/* do complete dump at shutdown */); } -#endif /* WITH_WSREP */ } srv_buf_dump_thread_active = false; diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 74df5ee2de8..fffeafbdf8c 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -31,7 +31,6 @@ Created 11/11/1995 Heikki Tuuri #include "buf0flu.h" #include "buf0buf.h" -#include "buf0mtflu.h" #include "buf0checksum.h" #include "srv0start.h" #include "srv0srv.h" @@ -151,6 +150,8 @@ struct page_cleaner_t { threads. */ os_event_t is_finished; /*!< event to signal that all slots were finished. */ + os_event_t is_started; /*!< event to signal that + thread is started/exiting */ volatile ulint n_workers; /*!< number of worker threads in existence */ bool requested; /*!< true if requested pages @@ -899,7 +900,7 @@ buf_flush_init_for_writing( /* Write the newest modification lsn to the page header and trailer */ mach_write_to_8(page + FIL_PAGE_LSN, newest_lsn); - mach_write_to_8(page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM, + mach_write_to_8(page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM, newest_lsn); if (block && srv_page_size == 16384) { @@ -930,6 +931,7 @@ buf_flush_init_for_writing( default: switch (page_type) { case FIL_PAGE_INDEX: + case FIL_PAGE_TYPE_INSTANT: case FIL_PAGE_RTREE: case FIL_PAGE_UNDO_LOG: case FIL_PAGE_INODE: @@ -993,7 +995,7 @@ buf_flush_init_for_writing( new enum is added and not handled here */ } - mach_write_to_4(page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM, + mach_write_to_4(page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM, checksum); } @@ -1018,7 +1020,7 @@ buf_flush_write_block_low( || space->purpose == FIL_TYPE_IMPORT || space->purpose == FIL_TYPE_TABLESPACE); ut_ad((space->purpose == FIL_TYPE_TEMPORARY) - == fsp_is_system_temporary(space->id)); + == (space == fil_system.temp_space)); page_t* frame = NULL; #ifdef UNIV_DEBUG buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); @@ -1128,7 +1130,7 @@ buf_flush_write_block_low( ut_ad(err == DB_SUCCESS); } - fil_space_release_for_io(space); + space->release_for_io(); /* Increment the counter of I/O operations used for selecting LRU policy. */ @@ -1848,6 +1850,7 @@ not guaranteed that the actual number is that big, though) @param[in] lsn_limit in the case of BUF_FLUSH_LIST all blocks whose oldest_modification is smaller than this should be flushed (if their number does not exceed min_n), otherwise ignored */ +static void buf_flush_batch( buf_pool_t* buf_pool, @@ -1887,6 +1890,7 @@ Gather the aggregated stats for both flush list and LRU list flushing. @param page_count_flush number of pages flushed from the end of the flush_list @param page_count_LRU number of pages flushed from the end of the LRU list */ +static void buf_flush_stats( /*============*/ @@ -1903,6 +1907,7 @@ buf_flush_stats( /******************************************************************//** Start a buffer flush batch for LRU or flush list */ +static ibool buf_flush_start( /*============*/ @@ -1934,22 +1939,8 @@ buf_flush_start( } /******************************************************************//** -Gather the aggregated stats for both flush list and LRU list flushing */ -void -buf_flush_common( -/*=============*/ - buf_flush_t flush_type, /*!< in: type of flush */ - ulint page_count) /*!< in: number of pages flushed */ -{ - buf_dblwr_flush_buffered_writes(); - - ut_a(flush_type == BUF_FLUSH_LRU || flush_type == BUF_FLUSH_LIST); - - srv_stats.buf_pool_flushed.add(page_count); -} - -/******************************************************************//** End a buffer flush batch for LRU or flush list */ +static void buf_flush_end( /*==========*/ @@ -2121,10 +2112,6 @@ buf_flush_lists( ulint n_flushed = 0; bool success = true; - if (buf_mtflu_init_done()) { - return(buf_mtflu_flush_list(min_n, lsn_limit, n_processed)); - } - if (n_processed) { *n_processed = 0; } @@ -2283,11 +2270,6 @@ buf_flush_LRU_list( memset(&n, 0, sizeof(flush_counters_t)); - if(buf_mtflu_init_done()) - { - return(buf_mtflu_flush_LRU_tail()); - } - ut_ad(buf_pool); /* srv_LRU_scan_depth can be arbitrarily large value. We cap it with current LRU size. */ @@ -2449,7 +2431,7 @@ page_cleaner_flush_pages_recommendation( cur_lsn = log_get_lsn_nowait(); - /* log_get_lsn_nowait tries to get log_sys->mutex with + /* log_get_lsn_nowait tries to get log_sys.mutex with mutex_enter_nowait, if this does not succeed function returns 0, do not use that value to update stats. */ if (cur_lsn == 0) { @@ -2713,7 +2695,7 @@ buf_flush_page_cleaner_init(void) page_cleaner.is_requested = os_event_create("pc_is_requested"); page_cleaner.is_finished = os_event_create("pc_is_finished"); - + page_cleaner.is_started = os_event_create("pc_is_started"); page_cleaner.n_slots = static_cast<ulint>(srv_buf_pool_instances); ut_d(page_cleaner.n_disabled_debug = 0); @@ -2788,8 +2770,8 @@ pc_flush_slot(void) { ulint lru_tm = 0; ulint list_tm = 0; - int lru_pass = 0; - int list_pass = 0; + ulint lru_pass = 0; + ulint list_pass = 0; mutex_enter(&page_cleaner.mutex); @@ -2993,17 +2975,10 @@ buf_flush_page_cleaner_disabled_loop(void) } /** Disables page cleaner threads (coordinator and workers). -It's used by: SET GLOBAL innodb_page_cleaner_disabled_debug = 1 (0). -@param[in] thd thread handle -@param[in] var pointer to system variable -@param[out] var_ptr where the formal string goes @param[in] save immediate result from check function */ -void -buf_flush_page_cleaner_disabled_debug_update( - THD* thd, - struct st_mysql_sys_var* var, - void* var_ptr, - const void* save) +void buf_flush_page_cleaner_disabled_debug_update(THD*, + st_mysql_sys_var*, void*, + const void* save) { if (!page_cleaner.is_running) { return; @@ -3433,6 +3408,7 @@ thread_exit: os_event_destroy(page_cleaner.is_finished); os_event_destroy(page_cleaner.is_requested); + os_event_destroy(page_cleaner.is_started); buf_page_cleaner_is_active = false; @@ -3444,6 +3420,35 @@ thread_exit: OS_THREAD_DUMMY_RETURN; } +/** Adjust thread count for page cleaner workers. +@param[in] new_cnt Number of threads to be used */ +void +buf_flush_set_page_cleaner_thread_cnt(ulong new_cnt) +{ + mutex_enter(&page_cleaner.mutex); + + srv_n_page_cleaners = new_cnt; + if (new_cnt > page_cleaner.n_workers) { + /* User has increased the number of page + cleaner threads. */ + ulint add = new_cnt - page_cleaner.n_workers; + for (ulint i = 0; i < add; i++) { + os_thread_id_t cleaner_thread_id; + os_thread_create(buf_flush_page_cleaner_worker, NULL, &cleaner_thread_id); + } + } + + mutex_exit(&page_cleaner.mutex); + + /* Wait until defined number of workers has started. */ + while (page_cleaner.is_running && + page_cleaner.n_workers != (srv_n_page_cleaners - 1)) { + os_event_set(page_cleaner.is_requested); + os_event_reset(page_cleaner.is_started); + os_event_wait_time(page_cleaner.is_started, 1000000); + } +} + /******************************************************************//** Worker thread of page_cleaner. @return a dummy parameter */ @@ -3456,9 +3461,18 @@ DECLARE_THREAD(buf_flush_page_cleaner_worker)( os_thread_create */ { my_thread_init(); +#ifndef DBUG_OFF + os_thread_id_t cleaner_thread_id = os_thread_get_curr_id(); +#endif mutex_enter(&page_cleaner.mutex); - page_cleaner.n_workers++; + ulint thread_no = page_cleaner.n_workers++; + + DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id + << " started; n_workers=" << page_cleaner.n_workers); + + /* Signal that we have started */ + os_event_set(page_cleaner.is_started); mutex_exit(&page_cleaner.mutex); #ifdef UNIV_LINUX @@ -3481,11 +3495,31 @@ DECLARE_THREAD(buf_flush_page_cleaner_worker)( break; } + ut_ad(srv_n_page_cleaners >= 1); + + /* If number of page cleaner threads is decreased + exit those that are not anymore needed. */ + if (srv_shutdown_state == SRV_SHUTDOWN_NONE && + thread_no >= (srv_n_page_cleaners - 1)) { + DBUG_LOG("ib_buf", "Exiting " + << thread_no + << " page cleaner worker thread_id " + << os_thread_pf(cleaner_thread_id) + << " total threads " << srv_n_page_cleaners << "."); + break; + } + pc_flush_slot(); } mutex_enter(&page_cleaner.mutex); page_cleaner.n_workers--; + + DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id + << " exiting; n_workers=" << page_cleaner.n_workers); + + /* Signal that we have stopped */ + os_event_set(page_cleaner.is_started); mutex_exit(&page_cleaner.mutex); my_thread_end(); @@ -3690,17 +3724,17 @@ buf_flush_get_dirty_pages_count( } /** FlushObserver constructor -@param[in] space_id table space id +@param[in] space tablespace @param[in] trx trx instance @param[in] stage performance schema accounting object, used by ALTER TABLE. It is passed to log_preflush_pool_modified_pages() for accounting. */ FlushObserver::FlushObserver( - ulint space_id, + fil_space_t* space, trx_t* trx, ut_stage_alter_t* stage) : - m_space_id(space_id), + m_space(space), m_trx(trx), m_stage(stage), m_interrupted(false) @@ -3719,7 +3753,7 @@ FlushObserver::FlushObserver( /** FlushObserver deconstructor */ FlushObserver::~FlushObserver() { - ut_ad(buf_flush_get_dirty_pages_count(m_space_id, this) == 0); + ut_ad(buf_flush_get_dirty_pages_count(m_space->id, this) == 0); UT_DELETE(m_flushed); UT_DELETE(m_removed); @@ -3777,10 +3811,10 @@ FlushObserver::flush() if (!m_interrupted && m_stage) { m_stage->begin_phase_flush(buf_flush_get_dirty_pages_count( - m_space_id, this)); + m_space->id, this)); } - buf_LRU_flush_or_remove_pages(m_space_id, this); + buf_LRU_flush_or_remove_pages(m_space->id, this); /* Wait for all dirty pages were flushed. */ for (ulint i = 0; i < srv_buf_pool_instances; i++) { diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index 8673c8d9d72..cfe3f9a6bcb 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -58,9 +58,6 @@ static const ulint BUF_LRU_OLD_TOLERANCE = 20; (that is, when there are more than BUF_LRU_OLD_MIN_LEN blocks). @see buf_LRU_old_adjust_len */ #define BUF_LRU_NON_OLD_MIN_LEN 5 -#if BUF_LRU_NON_OLD_MIN_LEN >= BUF_LRU_OLD_MIN_LEN -# error "BUF_LRU_NON_OLD_MIN_LEN >= BUF_LRU_OLD_MIN_LEN" -#endif /** When dropping the search hash index entries before deleting an ibd file, we build a local array of pages belonging to that tablespace @@ -371,7 +368,7 @@ bool buf_LRU_drop_page_hash_for_tablespace(dict_table_t* table) return false; drop_ahi: - ulint id = table->space; + ulint id = table->space_id; for (ulint i = 0; i < srv_buf_pool_instances; i++) { buf_LRU_drop_page_hash_for_tablespace(buf_pool_from_array(i), id); @@ -965,7 +962,7 @@ buf_LRU_get_free_only( assert_block_ahi_empty(block); buf_block_set_state(block, BUF_BLOCK_READY_FOR_USE); - UNIV_MEM_ALLOC(block->frame, UNIV_PAGE_SIZE); + UNIV_MEM_ALLOC(block->frame, srv_page_size); ut_ad(buf_pool_from_block(block) == buf_pool); @@ -1012,7 +1009,7 @@ buf_LRU_check_size_of_non_data_objects( " Check that your transactions do not set too many" " row locks, or review if" " innodb_buffer_pool_size=" - << (buf_pool->curr_size >> (20 - UNIV_PAGE_SIZE_SHIFT)) + << (buf_pool->curr_size >> (20U - srv_page_size_shift)) << "M could be bigger."; } else if (!recv_recovery_is_on() && buf_pool->curr_size == buf_pool->old_size @@ -1035,7 +1032,7 @@ buf_LRU_check_size_of_non_data_objects( " set too many row locks." " innodb_buffer_pool_size=" << (buf_pool->curr_size >> - (20 - UNIV_PAGE_SIZE_SHIFT)) << "M." + (20U - srv_page_size_shift)) << "M." " Starting the InnoDB Monitor to print" " diagnostics."; @@ -1222,9 +1219,11 @@ buf_LRU_old_adjust_len( ut_ad(buf_pool_mutex_own(buf_pool)); ut_ad(buf_pool->LRU_old_ratio >= BUF_LRU_OLD_RATIO_MIN); ut_ad(buf_pool->LRU_old_ratio <= BUF_LRU_OLD_RATIO_MAX); -#if BUF_LRU_OLD_RATIO_MIN * BUF_LRU_OLD_MIN_LEN <= BUF_LRU_OLD_RATIO_DIV * (BUF_LRU_OLD_TOLERANCE + 5) -# error "BUF_LRU_OLD_RATIO_MIN * BUF_LRU_OLD_MIN_LEN <= BUF_LRU_OLD_RATIO_DIV * (BUF_LRU_OLD_TOLERANCE + 5)" -#endif + compile_time_assert(BUF_LRU_OLD_RATIO_MIN * BUF_LRU_OLD_MIN_LEN + > BUF_LRU_OLD_RATIO_DIV + * (BUF_LRU_OLD_TOLERANCE + 5)); + compile_time_assert(BUF_LRU_NON_OLD_MIN_LEN < BUF_LRU_OLD_MIN_LEN); + #ifdef UNIV_LRU_DEBUG /* buf_pool->LRU_old must be the first item in the LRU list whose "old" flag is set. */ @@ -1768,10 +1767,10 @@ func_exit: order to avoid bogus Valgrind warnings.*/ UNIV_MEM_VALID(((buf_block_t*) bpage)->frame, - UNIV_PAGE_SIZE); + srv_page_size); btr_search_drop_page_hash_index((buf_block_t*) bpage); UNIV_MEM_INVALID(((buf_block_t*) bpage)->frame, - UNIV_PAGE_SIZE); + srv_page_size); if (b != NULL) { @@ -1837,10 +1836,10 @@ buf_LRU_block_free_non_file_page( buf_block_set_state(block, BUF_BLOCK_NOT_USED); - UNIV_MEM_ALLOC(block->frame, UNIV_PAGE_SIZE); + UNIV_MEM_ALLOC(block->frame, srv_page_size); #ifdef UNIV_DEBUG /* Wipe contents of page to reveal possible stale pointers to it */ - memset(block->frame, '\0', UNIV_PAGE_SIZE); + memset(block->frame, '\0', srv_page_size); #else /* Wipe page_no and space_id */ memset(block->frame + FIL_PAGE_OFFSET, 0xfe, 4); @@ -1881,7 +1880,7 @@ buf_LRU_block_free_non_file_page( ut_d(block->page.in_free_list = TRUE); } - UNIV_MEM_FREE(block->frame, UNIV_PAGE_SIZE); + UNIV_MEM_FREE(block->frame, srv_page_size); } /******************************************************************//** @@ -1930,7 +1929,7 @@ buf_LRU_block_remove_hashed( case BUF_BLOCK_FILE_PAGE: UNIV_MEM_ASSERT_W(bpage, sizeof(buf_block_t)); UNIV_MEM_ASSERT_W(((buf_block_t*) bpage)->frame, - UNIV_PAGE_SIZE); + srv_page_size); buf_block_modify_clock_inc((buf_block_t*) bpage); if (bpage->zip.data) { const page_t* page = ((buf_block_t*) bpage)->frame; @@ -1959,11 +1958,11 @@ buf_LRU_block_remove_hashed( break; case FIL_PAGE_INDEX: case FIL_PAGE_RTREE: -#ifdef UNIV_ZIP_DEBUG +#if defined UNIV_ZIP_DEBUG && defined BTR_CUR_HASH_ADAPT ut_a(page_zip_validate( &bpage->zip, page, ((buf_block_t*) bpage)->index)); -#endif /* UNIV_ZIP_DEBUG */ +#endif /* UNIV_ZIP_DEBUG && BTR_CUR_HASH_ADAPT */ break; default: ib::error() << "The compressed page to be" @@ -2079,7 +2078,7 @@ buf_LRU_block_remove_hashed( memset(((buf_block_t*) bpage)->frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, 0xff, 4); UNIV_MEM_INVALID(((buf_block_t*) bpage)->frame, - UNIV_PAGE_SIZE); + srv_page_size); buf_page_set_state(bpage, BUF_BLOCK_REMOVE_HASH); /* Question: If we release bpage and hash mutex here diff --git a/storage/innobase/buf/buf0mtflu.cc b/storage/innobase/buf/buf0mtflu.cc deleted file mode 100644 index aae90e48168..00000000000 --- a/storage/innobase/buf/buf0mtflu.cc +++ /dev/null @@ -1,736 +0,0 @@ -/***************************************************************************** - -Copyright (C) 2013, 2014, Fusion-io. All Rights Reserved. -Copyright (C) 2013, 2017, MariaDB Corporation. All Rights Reserved. - -This program is free software; you can redistribute it and/or modify it under -the terms of the GNU General Public License as published by the Free Software -Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, but WITHOUT -ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along with -this program; if not, write to the Free Software Foundation, Inc., -51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA - -*****************************************************************************/ - -/******************************************************************//** -@file buf/buf0mtflu.cc -Multi-threaded flush method implementation - -Created 06/11/2013 Dhananjoy Das DDas@fusionio.com -Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com -Modified 03/02/2014 Dhananjoy Das DDas@fusionio.com -Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com -***********************************************************************/ - -#include "buf0buf.h" -#include "buf0flu.h" -#include "buf0mtflu.h" -#include "buf0checksum.h" -#include "srv0start.h" -#include "srv0srv.h" -#include "page0zip.h" -#include "ut0byte.h" -#include "ut0lst.h" -#include "page0page.h" -#include "fil0fil.h" -#include "buf0lru.h" -#include "buf0rea.h" -#include "ibuf0ibuf.h" -#include "log0log.h" -#include "os0file.h" -#include "trx0sys.h" -#include "srv0mon.h" -#include "mysql/plugin.h" -#include "mysql/service_thd_wait.h" -#include "fil0pagecompress.h" - -#define MT_COMP_WATER_MARK 50 -/** Time to wait for a message. */ -#define MT_WAIT_IN_USECS 5000000 - -/* Work item status */ -typedef enum wrk_status { - WRK_ITEM_UNSET=0, /*!< Work item is not set */ - WRK_ITEM_START=1, /*!< Processing of work item has started */ - WRK_ITEM_DONE=2, /*!< Processing is done usually set to - SUCCESS/FAILED */ - WRK_ITEM_SUCCESS=2, /*!< Work item successfully processed */ - WRK_ITEM_FAILED=3, /*!< Work item process failed */ - WRK_ITEM_EXIT=4, /*!< Exiting */ - WRK_ITEM_SET=5, /*!< Work item is set */ - WRK_ITEM_STATUS_UNDEFINED -} wrk_status_t; - -/* Work item task type */ -typedef enum mt_wrk_tsk { - MT_WRK_NONE=0, /*!< Exit queue-wait */ - MT_WRK_WRITE=1, /*!< Flush operation */ - MT_WRK_READ=2, /*!< Read operation */ - MT_WRK_UNDEFINED -} mt_wrk_tsk_t; - -/* Work thread status */ -typedef enum wthr_status { - WTHR_NOT_INIT=0, /*!< Work thread not initialized */ - WTHR_INITIALIZED=1, /*!< Work thread initialized */ - WTHR_SIG_WAITING=2, /*!< Work thread wating signal */ - WTHR_RUNNING=3, /*!< Work thread running */ - WTHR_NO_WORK=4, /*!< Work thread has no work */ - WTHR_KILL_IT=5, /*!< Work thread should exit */ - WTHR_STATUS_UNDEFINED -} wthr_status_t; - -/* Write work task */ -typedef struct wr_tsk { - buf_pool_t *buf_pool; /*!< buffer-pool instance */ - buf_flush_t flush_type; /*!< flush-type for buffer-pool - flush operation */ - ulint min; /*!< minimum number of pages - requested to be flushed */ - lsn_t lsn_limit; /*!< lsn limit for the buffer-pool - flush operation */ -} wr_tsk_t; - -/* Read work task */ -typedef struct rd_tsk { - buf_pool_t *page_pool; /*!< list of pages to decompress; */ -} rd_tsk_t; - -/* Work item */ -typedef struct wrk_itm -{ - mt_wrk_tsk_t tsk; /*!< Task type. Based on task-type - one of the entries wr_tsk/rd_tsk - will be used */ - wr_tsk_t wr; /*!< Flush page list */ - rd_tsk_t rd; /*!< Decompress page list */ - ulint n_flushed; /*!< Number of flushed pages */ - ulint n_evicted; /*!< Number of evicted pages */ - os_thread_id_t id_usr; /*!< Thread-id currently working */ - wrk_status_t wi_status; /*!< Work item status */ - mem_heap_t *wheap; /*!< Heap were to allocate memory - for queue nodes */ - mem_heap_t *rheap; -} wrk_t; - -struct thread_data_t -{ - os_thread_id_t wthread_id; /*!< Identifier */ - wthr_status_t wt_status; /*!< Worker thread status */ -}; - -/** Flush dirty pages when multi-threaded flush is used. */ -extern "C" UNIV_INTERN -os_thread_ret_t -DECLARE_THREAD(mtflush_io_thread)(void* arg); - -/** Thread syncronization data */ -struct thread_sync_t -{ - /** Constructor */ - thread_sync_t(ulint n_threads, mem_heap_t* wheap, mem_heap_t* rheap) : - thread_global_mtx(), n_threads(n_threads), - wq(ib_wqueue_create()), - wr_cq(ib_wqueue_create()), - rd_cq(ib_wqueue_create()), - wheap(wheap), rheap(rheap), gwt_status(), - thread_data(static_cast<thread_data_t*>( - mem_heap_zalloc(wheap, n_threads - * sizeof *thread_data))) - { - ut_a(wq); - ut_a(wr_cq); - ut_a(rd_cq); - ut_a(thread_data); - - mutex_create(LATCH_ID_MTFLUSH_THREAD_MUTEX, - &thread_global_mtx); - - /* Create threads for page-compression-flush */ - for(ulint i = 0; i < n_threads; i++) { - thread_data[i].wt_status = WTHR_INITIALIZED; - os_thread_create(mtflush_io_thread, this, - &thread_data[i].wthread_id); - } - } - - /** Destructor */ - ~thread_sync_t() - { - ut_a(ib_wqueue_is_empty(wq)); - ut_a(ib_wqueue_is_empty(wr_cq)); - ut_a(ib_wqueue_is_empty(rd_cq)); - - /* Free all queues */ - ib_wqueue_free(wq); - ib_wqueue_free(wr_cq); - ib_wqueue_free(rd_cq); - - mutex_free(&thread_global_mtx); - - mem_heap_free(rheap); - mem_heap_free(wheap); - } - - /* Global variables used by all threads */ - ib_mutex_t thread_global_mtx; /*!< Mutex used protecting below - variables */ - ulint n_threads; /*!< Number of threads */ - ib_wqueue_t *wq; /*!< Work Queue */ - ib_wqueue_t *wr_cq; /*!< Write Completion Queue */ - ib_wqueue_t *rd_cq; /*!< Read Completion Queue */ - mem_heap_t* wheap; /*!< Work heap where memory - is allocated */ - mem_heap_t* rheap; /*!< Work heap where memory - is allocated */ - wthr_status_t gwt_status; /*!< Global thread status */ - - /* Variables used by only one thread at a time */ - thread_data_t* thread_data; /*!< Thread specific data */ -}; - -static thread_sync_t* mtflush_ctx; -static ib_mutex_t mtflush_mtx; - -/******************************************************************//** -Return true if multi-threaded flush is initialized -@return true if initialized */ -bool -buf_mtflu_init_done(void) -/*=====================*/ -{ - return(mtflush_ctx != NULL); -} - -/******************************************************************//** -Fush buffer pool instance. -@return number of flushed pages, or 0 if error happened -*/ -static -ulint -buf_mtflu_flush_pool_instance( -/*==========================*/ - wrk_t *work_item) /*!< inout: work item to be flushed */ -{ - flush_counters_t n; - ut_a(work_item != NULL); - ut_a(work_item->wr.buf_pool != NULL); - - if (!buf_flush_start(work_item->wr.buf_pool, work_item->wr.flush_type)) { - /* We have two choices here. If lsn_limit was - specified then skipping an instance of buffer - pool means we cannot guarantee that all pages - up to lsn_limit has been flushed. We can - return right now with failure or we can try - to flush remaining buffer pools up to the - lsn_limit. We attempt to flush other buffer - pools based on the assumption that it will - help in the retry which will follow the - failure. */ -#ifdef UNIV_MTFLUSH_DEBUG - fprintf(stderr, "InnoDB: Note: buf flush start failed there is already active flush for this buffer pool.\n"); -#endif - return 0; - } - - memset(&n, 0, sizeof(flush_counters_t)); - - if (work_item->wr.flush_type == BUF_FLUSH_LRU) { - /* srv_LRU_scan_depth can be arbitrarily large value. - * We cap it with current LRU size. - */ - buf_pool_mutex_enter(work_item->wr.buf_pool); - work_item->wr.min = UT_LIST_GET_LEN(work_item->wr.buf_pool->LRU); - buf_pool_mutex_exit(work_item->wr.buf_pool); - work_item->wr.min = ut_min((ulint)srv_LRU_scan_depth,(ulint)work_item->wr.min); - } - - buf_flush_batch(work_item->wr.buf_pool, - work_item->wr.flush_type, - work_item->wr.min, - work_item->wr.lsn_limit, - &n); - - buf_flush_end(work_item->wr.buf_pool, work_item->wr.flush_type); - buf_flush_common(work_item->wr.flush_type, n.flushed); - work_item->n_flushed = n.flushed; - work_item->n_evicted = n.evicted; - - return work_item->n_flushed; -} - -/******************************************************************//** -Worker function to wait for work items and processing them and -sending reply back. -*/ -static -void -mtflush_service_io( -/*===============*/ - thread_sync_t* mtflush_io, /*!< inout: multi-threaded flush - syncronization data */ - thread_data_t* thread_data) /* Thread status data */ -{ - wrk_t *work_item = NULL; - ulint n_flushed=0; - - ut_a(mtflush_io != NULL); - ut_a(thread_data != NULL); - - thread_data->wt_status = WTHR_SIG_WAITING; - - work_item = (wrk_t *)ib_wqueue_nowait(mtflush_io->wq); - - if (work_item == NULL) { - work_item = (wrk_t *)ib_wqueue_wait(mtflush_io->wq); - } - - if (work_item) { - thread_data->wt_status = WTHR_RUNNING; - } else { - /* Thread did not get any work */ - thread_data->wt_status = WTHR_NO_WORK; - return; - } - - if (work_item->wi_status != WRK_ITEM_EXIT) { - work_item->wi_status = WRK_ITEM_SET; - } - -#ifdef UNIV_MTFLUSH_DEBUG - ut_a(work_item->id_usr == 0); -#endif - work_item->id_usr = os_thread_get_curr_id(); - - /* This works as a producer/consumer model, where in tasks are - * inserted into the work-queue (wq) and completions are based - * on the type of operations performed and as a result the WRITE/ - * compression/flush operation completions get posted to wr_cq. - * And READ/decompress operations completions get posted to rd_cq. - * in future we may have others. - */ - - switch(work_item->tsk) { - case MT_WRK_NONE: - ut_a(work_item->wi_status == WRK_ITEM_EXIT); - work_item->wi_status = WRK_ITEM_EXIT; - ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap); - thread_data->wt_status = WTHR_KILL_IT; - break; - - case MT_WRK_WRITE: - ut_a(work_item->wi_status == WRK_ITEM_SET); - work_item->wi_status = WRK_ITEM_START; - /* Process work item */ - if (0 == (n_flushed = buf_mtflu_flush_pool_instance(work_item))) { - work_item->wi_status = WRK_ITEM_FAILED; - } - work_item->wi_status = WRK_ITEM_SUCCESS; - ib_wqueue_add(mtflush_io->wr_cq, work_item, work_item->rheap); - break; - - case MT_WRK_READ: - ut_a(0); - break; - - default: - /* None other than Write/Read handling planned */ - ut_a(0); - break; - } -} - -/** Flush dirty pages when multi-threaded flush is used. */ -extern "C" UNIV_INTERN -os_thread_ret_t -DECLARE_THREAD(mtflush_io_thread)(void* arg) -{ - thread_sync_t *mtflush_io = ((thread_sync_t *)arg); - thread_data_t *this_thread_data = NULL; - ulint i; - - /* Find correct slot for this thread */ - mutex_enter(&(mtflush_io->thread_global_mtx)); - for(i=0; i < mtflush_io->n_threads; i ++) { - if (mtflush_io->thread_data[i].wthread_id == os_thread_get_curr_id()) { - break; - } - } - - ut_a(i <= mtflush_io->n_threads); - this_thread_data = &mtflush_io->thread_data[i]; - mutex_exit(&(mtflush_io->thread_global_mtx)); - - while (TRUE) { - -#ifdef UNIV_MTFLUSH_DEBUG - fprintf(stderr, "InnoDB: Note. Thread %lu work queue len %lu return queue len %lu\n", - os_thread_get_curr_id(), - ib_wqueue_len(mtflush_io->wq), - ib_wqueue_len(mtflush_io->wr_cq)); -#endif /* UNIV_MTFLUSH_DEBUG */ - - mtflush_service_io(mtflush_io, this_thread_data); - - - if (this_thread_data->wt_status == WTHR_KILL_IT) { - break; - } - } - - os_thread_exit(); - OS_THREAD_DUMMY_RETURN; -} - -/******************************************************************//** -Add exit work item to work queue to signal multi-threded flush -threads that they should exit. -*/ -void -buf_mtflu_io_thread_exit(void) -/*==========================*/ -{ - ulint i; - thread_sync_t* mtflush_io = mtflush_ctx; - wrk_t* work_item = NULL; - - ut_a(mtflush_io != NULL); - - /* Allocate work items for shutdown message */ - work_item = (wrk_t*)mem_heap_alloc(mtflush_io->wheap, sizeof(wrk_t)*srv_mtflush_threads); - - /* Confirm if the io-thread KILL is in progress, bailout */ - if (mtflush_io->gwt_status == WTHR_KILL_IT) { - return; - } - - mtflush_io->gwt_status = WTHR_KILL_IT; - - /* This lock is to safequard against timing bug: flush request take - this mutex before sending work items to be processed by flush - threads. Inside flush thread we assume that work queue contains only - a constant number of items. Thus, we may not install new work items - below before all previous ones are processed. This mutex is released - by flush request after all work items sent to flush threads have - been processed. Thus, we can get this mutex if and only if work - queue is empty. */ - - mutex_enter(&mtflush_mtx); - - /* Make sure the work queue is empty */ - ut_a(ib_wqueue_is_empty(mtflush_io->wq)); - - /* Send one exit work item/thread */ - for (i=0; i < (ulint)srv_mtflush_threads; i++) { - work_item[i].tsk = MT_WRK_NONE; - work_item[i].wi_status = WRK_ITEM_EXIT; - work_item[i].wheap = mtflush_io->wheap; - work_item[i].rheap = mtflush_io->rheap; - work_item[i].id_usr = 0; - - ib_wqueue_add(mtflush_io->wq, - (void *)&(work_item[i]), - mtflush_io->wheap); - } - - /* Requests sent */ - mutex_exit(&mtflush_mtx); - - /* Wait until all work items on a work queue are processed */ - while(!ib_wqueue_is_empty(mtflush_io->wq)) { - /* Wait */ - os_thread_sleep(MT_WAIT_IN_USECS); - } - - ut_a(ib_wqueue_is_empty(mtflush_io->wq)); - - /* Collect all work done items */ - for (i=0; i < (ulint)srv_mtflush_threads;) { - wrk_t* work_item = NULL; - - work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS); - - /* If we receive reply to work item and it's status is exit, - thead has processed this message and existed */ - if (work_item && work_item->wi_status == WRK_ITEM_EXIT) { - i++; - } - } - - /* Wait about 1/2 sec to allow threads really exit */ - os_thread_sleep(MT_WAIT_IN_USECS); - - /* Make sure that work queue is empty */ - while(!ib_wqueue_is_empty(mtflush_io->wq)) - { - ib_wqueue_nowait(mtflush_io->wq); - } - - mtflush_ctx->~thread_sync_t(); - mtflush_ctx = NULL; - - mutex_free(&mtflush_mtx); -} - -/******************************************************************//** -Initialize multi-threaded flush thread syncronization data. -@return Initialized multi-threaded flush thread syncroniztion data. */ -void* -buf_mtflu_handler_init( -/*===================*/ - ulint n_threads, /*!< in: Number of threads to create */ - ulint wrk_cnt) /*!< in: Number of work items */ -{ - mem_heap_t* mtflush_heap; - mem_heap_t* mtflush_heap2; - - /* Create heap, work queue, write completion queue, read - completion queue for multi-threaded flush, and init - handler. */ - mtflush_heap = mem_heap_create(0); - ut_a(mtflush_heap != NULL); - mtflush_heap2 = mem_heap_create(0); - ut_a(mtflush_heap2 != NULL); - - mutex_create(LATCH_ID_MTFLUSH_MUTEX, &mtflush_mtx); - - mtflush_ctx = new (mem_heap_zalloc(mtflush_heap, sizeof *mtflush_ctx)) - thread_sync_t(n_threads, mtflush_heap, mtflush_heap2); - - return((void *)mtflush_ctx); -} - -/******************************************************************//** -Flush buffer pool instances. -@return number of pages flushed. */ -ulint -buf_mtflu_flush_work_items( -/*=======================*/ - ulint buf_pool_inst, /*!< in: Number of buffer pool instances */ - flush_counters_t *per_pool_cnt, /*!< out: Number of pages - flushed or evicted /instance */ - buf_flush_t flush_type, /*!< in: Type of flush */ - ulint min_n, /*!< in: Wished minimum number of - blocks to be flushed */ - lsn_t lsn_limit) /*!< in: All blocks whose - oldest_modification is smaller than - this should be flushed (if their - number does not exceed min_n) */ -{ - ulint n_flushed=0, i; - mem_heap_t* work_heap; - mem_heap_t* reply_heap; - wrk_t work_item[MTFLUSH_MAX_WORKER]; - - if (mtflush_ctx->gwt_status == WTHR_KILL_IT) { - return 0; - } - - /* Allocate heap where all work items used and queue - node items areallocated */ - work_heap = mem_heap_create(0); - reply_heap = mem_heap_create(0); - - - for(i=0;i<buf_pool_inst; i++) { - work_item[i].tsk = MT_WRK_WRITE; - work_item[i].wr.buf_pool = buf_pool_from_array(i); - work_item[i].wr.flush_type = flush_type; - work_item[i].wr.min = min_n; - work_item[i].wr.lsn_limit = lsn_limit; - work_item[i].wi_status = WRK_ITEM_UNSET; - work_item[i].wheap = work_heap; - work_item[i].rheap = reply_heap; - work_item[i].n_flushed = 0; - work_item[i].n_evicted = 0; - work_item[i].id_usr = 0; - - ib_wqueue_add(mtflush_ctx->wq, - (void *)(work_item + i), - work_heap); - } - - /* wait on the completion to arrive */ - for(i=0; i< buf_pool_inst;) { - wrk_t *done_wi = NULL; - done_wi = (wrk_t *)ib_wqueue_wait(mtflush_ctx->wr_cq); - - if (done_wi != NULL) { - per_pool_cnt[i].flushed = done_wi->n_flushed; - per_pool_cnt[i].evicted = done_wi->n_evicted; - -#ifdef UNIV_MTFLUSH_DEBUG - if((int)done_wi->id_usr == 0 && - (done_wi->wi_status == WRK_ITEM_SET || - done_wi->wi_status == WRK_ITEM_UNSET)) { - fprintf(stderr, - "**Set/Unused work_item[%lu] flush_type=%d\n", - i, - done_wi->wr.flush_type); - ut_a(0); - } -#endif - - n_flushed+= done_wi->n_flushed+done_wi->n_evicted; - i++; - } - } - - /* Release used work_items and queue nodes */ - mem_heap_free(work_heap); - mem_heap_free(reply_heap); - - return(n_flushed); -} - -/*******************************************************************//** -Multi-threaded version of buf_flush_list -*/ -bool -buf_mtflu_flush_list( -/*=================*/ - ulint min_n, /*!< in: wished minimum mumber of blocks - flushed (it is not guaranteed that the - actual number is that big, though) */ - lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all - blocks whose oldest_modification is - smaller than this should be flushed - (if their number does not exceed - min_n), otherwise ignored */ - ulint* n_processed) /*!< out: the number of pages - which were processed is passed - back to caller. Ignored if NULL */ - -{ - ulint i; - bool success = true; - flush_counters_t cnt[MTFLUSH_MAX_WORKER]; - - if (n_processed) { - *n_processed = 0; - } - - if (min_n != ULINT_MAX) { - /* Ensure that flushing is spread evenly amongst the - buffer pool instances. When min_n is ULINT_MAX - we need to flush everything up to the lsn limit - so no limit here. */ - min_n = (min_n + srv_buf_pool_instances - 1) - / srv_buf_pool_instances; - } - - /* This lock is to safequard against re-entry if any. */ - mutex_enter(&mtflush_mtx); - buf_mtflu_flush_work_items(srv_buf_pool_instances, - cnt, BUF_FLUSH_LIST, - min_n, lsn_limit); - mutex_exit(&mtflush_mtx); - - for (i = 0; i < srv_buf_pool_instances; i++) { - if (n_processed) { - *n_processed += cnt[i].flushed+cnt[i].evicted; - } - - if (cnt[i].flushed) { - MONITOR_INC_VALUE_CUMULATIVE( - MONITOR_FLUSH_BATCH_TOTAL_PAGE, - MONITOR_FLUSH_BATCH_COUNT, - MONITOR_FLUSH_BATCH_PAGES, - cnt[i].flushed); - } - - if(cnt[i].evicted) { - MONITOR_INC_VALUE_CUMULATIVE( - MONITOR_LRU_BATCH_EVICT_TOTAL_PAGE, - MONITOR_LRU_BATCH_EVICT_COUNT, - MONITOR_LRU_BATCH_EVICT_PAGES, - cnt[i].evicted); - } - } -#ifdef UNIV_MTFLUSH_DEBUG - fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu ]\n", - __FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed); -#endif - return(success); -} - -/*********************************************************************//** -Clears up tail of the LRU lists: -* Put replaceable pages at the tail of LRU to the free list -* Flush dirty pages at the tail of LRU to the disk -The depth to which we scan each buffer pool is controlled by dynamic -config parameter innodb_LRU_scan_depth. -@return total pages flushed */ -UNIV_INTERN -ulint -buf_mtflu_flush_LRU_tail(void) -/*==========================*/ -{ - ulint total_flushed=0, i; - flush_counters_t cnt[MTFLUSH_MAX_WORKER]; - - ut_a(buf_mtflu_init_done()); - - /* At shutdown do not send requests anymore */ - if (!mtflush_ctx || mtflush_ctx->gwt_status == WTHR_KILL_IT) { - return (total_flushed); - } - - /* This lock is to safeguard against re-entry if any */ - mutex_enter(&mtflush_mtx); - buf_mtflu_flush_work_items(srv_buf_pool_instances, - cnt, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); - mutex_exit(&mtflush_mtx); - - for (i = 0; i < srv_buf_pool_instances; i++) { - total_flushed += cnt[i].flushed+cnt[i].evicted; - - if (cnt[i].flushed) { - MONITOR_INC_VALUE_CUMULATIVE( - MONITOR_LRU_BATCH_FLUSH_TOTAL_PAGE, - MONITOR_LRU_BATCH_FLUSH_COUNT, - MONITOR_LRU_BATCH_FLUSH_PAGES, - cnt[i].flushed); - } - - if(cnt[i].evicted) { - MONITOR_INC_VALUE_CUMULATIVE( - MONITOR_LRU_BATCH_EVICT_TOTAL_PAGE, - MONITOR_LRU_BATCH_EVICT_COUNT, - MONITOR_LRU_BATCH_EVICT_PAGES, - cnt[i].evicted); - } - } - -#if UNIV_MTFLUSH_DEBUG - fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu ]\n", ( - srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed); -#endif - - return(total_flushed); -} - -/*********************************************************************//** -Set correct thread identifiers to io thread array based on -information we have. */ -void -buf_mtflu_set_thread_ids( -/*=====================*/ - ulint n_threads, /*!<in: Number of threads to fill */ - void* ctx, /*!<in: thread context */ - os_thread_id_t* thread_ids) /*!<in: thread id array */ -{ - thread_sync_t *mtflush_io = ((thread_sync_t *)ctx); - ulint i; - ut_a(mtflush_io != NULL); - ut_a(thread_ids != NULL); - - for(i = 0; i < n_threads; i++) { - thread_ids[i] = mtflush_io->thread_data[i].wthread_id; - } -} diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc index 188d0aa24b6..9dd5857df17 100644 --- a/storage/innobase/buf/buf0rea.cc +++ b/storage/innobase/buf/buf0rea.cc @@ -304,7 +304,7 @@ buf_read_ahead_random( if (high > space->size) { high = space->size; } - fil_space_release(space); + space->release(); } else { return(0); } @@ -587,7 +587,7 @@ buf_read_ahead_linear( if (fil_space_t* space = fil_space_acquire(page_id.space())) { space_size = space->size; - fil_space_release(space); + space->release(); if (high > space_size) { /* The area is not whole */ @@ -815,7 +815,7 @@ buf_read_ibuf_merge_pages( in the arrays */ { #ifdef UNIV_IBUF_DEBUG - ut_a(n_stored < UNIV_PAGE_SIZE); + ut_a(n_stored < srv_page_size); #endif for (ulint i = 0; i < n_stored; i++) { |