diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2019-10-29 22:37:12 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2019-11-15 18:09:30 +0100 |
commit | 5e62b6a5e06eb02cbde1e34e95e26f42d87fce02 (patch) | |
tree | e20f8c1d9cd084ae1b8a4d2bd33eec4027d9fa90 /storage/innobase/row | |
parent | 00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (diff) | |
download | mariadb-git-5e62b6a5e06eb02cbde1e34e95e26f42d87fce02.tar.gz |
MDEV-16264 Use threadpool for Innodb background work.
Almost all threads have gone
- the "ticking" threads, that sleep a while then do some work)
(srv_monitor_thread, srv_error_monitor_thread, srv_master_thread)
were replaced with timers. Some timers are periodic,
e.g the "master" timer.
- The btr_defragment_thread is also replaced by a timer , which
reschedules it self when current defragment "item" needs throttling
- the buf_resize_thread and buf_dump_threads are substitutes with tasks
Ditto with page cleaner workers.
- purge workers threads are not tasks as well, and purge cleaner
coordinator is a combination of a task and timer.
- All AIO is outsourced to tpool, Innodb just calls thread_pool::submit_io()
and provides the callback.
- The srv_slot_t was removed, and innodb_debug_sync used in purge
is currently not working, and needs reimplementation.
Diffstat (limited to 'storage/innobase/row')
-rw-r--r-- | storage/innobase/row/row0ftsort.cc | 42 | ||||
-rw-r--r-- | storage/innobase/row/row0merge.cc | 88 | ||||
-rw-r--r-- | storage/innobase/row/row0mysql.cc | 2 | ||||
-rw-r--r-- | storage/innobase/row/row0purge.cc | 20 |
4 files changed, 20 insertions, 132 deletions
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc index 45a5fee59ec..ca3de3e2af0 100644 --- a/storage/innobase/row/row0ftsort.cc +++ b/storage/innobase/row/row0ftsort.cc @@ -216,7 +216,6 @@ row_fts_psort_info_init( common_info->trx = trx; common_info->all_info = psort_info; common_info->sort_event = os_event_create(0); - common_info->merge_event = os_event_create(0); common_info->opt_doc_id_size = opt_doc_id_size; if (log_tmp_is_encrypted()) { @@ -350,7 +349,6 @@ row_fts_psort_info_destroy( } os_event_destroy(merge_info[0].psort_common->sort_event); - os_event_destroy(merge_info[0].psort_common->merge_event); ut_free(merge_info[0].psort_common->dup); ut_free(merge_info[0].psort_common); ut_free(psort_info); @@ -754,10 +752,9 @@ row_merge_fts_get_next_doc_item( /*********************************************************************//** Function performs parallel tokenization of the incoming doc strings. It also performs the initial in memory sort of the parsed records. -@return OS_THREAD_DUMMY_RETURN */ +*/ static -os_thread_ret_t -DECLARE_THREAD(fts_parallel_tokenization)( +void fts_parallel_tokenization( /*======================*/ void* arg) /*!< in: psort_info for the thread */ { @@ -1065,10 +1062,6 @@ func_exit: psort_info->child_status = FTS_CHILD_COMPLETE; os_event_set(psort_info->psort_common->sort_event); psort_info->child_status = FTS_CHILD_EXITING; - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; } /*********************************************************************//** @@ -1079,23 +1072,20 @@ row_fts_start_psort( fts_psort_t* psort_info) /*!< parallel sort structure */ { ulint i = 0; - os_thread_id_t thd_id; for (i = 0; i < fts_sort_pll_degree; i++) { psort_info[i].psort_id = i; - psort_info[i].thread_hdl = - os_thread_create(fts_parallel_tokenization, - (void*) &psort_info[i], - &thd_id); + psort_info[i].task = + new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]); + srv_thread_pool->submit_task(psort_info[i].task); } } /*********************************************************************//** -Function performs the merge and insertion of the sorted records. -@return OS_THREAD_DUMMY_RETURN */ +Function performs the merge and insertion of the sorted records. */ static -os_thread_ret_t -DECLARE_THREAD(fts_parallel_merge)( +void +fts_parallel_merge( /*===============*/ void* arg) /*!< in: parallel merge info */ { @@ -1109,14 +1099,6 @@ DECLARE_THREAD(fts_parallel_merge)( row_fts_merge_insert(psort_info->psort_common->dup->index, psort_info->psort_common->new_table, psort_info->psort_common->all_info, id); - - psort_info->child_status = FTS_CHILD_COMPLETE; - os_event_set(psort_info->psort_common->merge_event); - psort_info->child_status = FTS_CHILD_EXITING; - - os_thread_exit(false); - - OS_THREAD_DUMMY_RETURN; } /*********************************************************************//** @@ -1128,15 +1110,15 @@ row_fts_start_parallel_merge( { ulint i = 0; - /* Kick off merge/insert threads */ + /* Kick off merge/insert tasks */ for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { merge_info[i].psort_id = i; merge_info[i].child_status = 0; - merge_info[i].thread_hdl = os_thread_create( + merge_info[i].task = new tpool::waitable_task( fts_parallel_merge, - (void*) &merge_info[i], - &merge_info[i].thread_hdl); + (void*) &merge_info[i]); + srv_thread_pool->submit_task(merge_info[i].task); } } diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index dad2e060678..3718d34f7c7 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -2771,10 +2771,6 @@ all_done: DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n"); #endif if (fts_pll_sort) { - bool all_exit = false; - ulint trial_count = 0; - const ulint max_trial_count = 10000; - wait_again: /* Check if error occurs in child thread */ for (ulint j = 0; j < fts_sort_pll_degree; j++) { @@ -2807,27 +2803,9 @@ wait_again: } } - /* Now all children should complete, wait a bit until - they all finish setting the event, before we free everything. - This has a 10 second timeout */ - do { - all_exit = true; - - for (ulint j = 0; j < fts_sort_pll_degree; j++) { - if (psort_info[j].child_status - != FTS_CHILD_EXITING) { - all_exit = false; - os_thread_sleep(1000); - break; - } - } - trial_count++; - } while (!all_exit && trial_count < max_trial_count); - - if (!all_exit) { - ib::fatal() << "Not all child sort threads exited" - " when creating FTS index '" - << fts_sort_idx->name << "'"; + for (ulint j = 0; j < fts_sort_pll_degree; j++) { + psort_info[j].task->wait(); + delete psort_info[j].task; } } @@ -4109,7 +4087,7 @@ row_merge_file_create_low( File f = create_temp_file(filename, path, "ib", O_BINARY | O_SEQUENTIAL, MYF(MY_WME | MY_TEMPORARY)); - pfs_os_file_t fd = IF_WIN(my_get_osfhandle(f), f); + pfs_os_file_t fd = IF_WIN((os_file_t)my_get_osfhandle(f), f); #ifdef UNIV_PFS_IO register_pfs_file_open_end(locker, fd, @@ -4155,7 +4133,7 @@ row_merge_file_destroy_low( const pfs_os_file_t& fd) /*!< in: merge file descriptor */ { if (fd != OS_FILE_CLOSED) { - int res = mysql_file_close(IF_WIN(my_win_handle2File(fd), fd), + int res = mysql_file_close(IF_WIN(my_win_handle2File((os_file_t)fd), fd), MYF(MY_WME)); ut_a(res != -1); } @@ -4589,7 +4567,6 @@ row_merge_build_indexes( dict_index_t* fts_sort_idx = NULL; fts_psort_t* psort_info = NULL; fts_psort_t* merge_info = NULL; - int64_t sig_count = 0; bool fts_psort_initiated = false; double total_static_cost = 0; @@ -4756,65 +4733,14 @@ row_merge_build_indexes( } if (indexes[i]->type & DICT_FTS) { - os_event_t fts_parallel_merge_event; sort_idx = fts_sort_idx; - fts_parallel_merge_event - = merge_info[0].psort_common->merge_event; - if (FTS_PLL_MERGE) { - ulint trial_count = 0; - bool all_exit = false; - - os_event_reset(fts_parallel_merge_event); row_fts_start_parallel_merge(merge_info); -wait_again: - os_event_wait_time_low( - fts_parallel_merge_event, 1000000, - sig_count); - for (j = 0; j < FTS_NUM_AUX_INDEX; j++) { - if (merge_info[j].child_status - != FTS_CHILD_COMPLETE - && merge_info[j].child_status - != FTS_CHILD_EXITING) { - sig_count = os_event_reset( - fts_parallel_merge_event); - - goto wait_again; - } - } - - /* Now all children should complete, wait - a bit until they all finish using event */ - while (!all_exit && trial_count < 10000) { - all_exit = true; - - for (j = 0; j < FTS_NUM_AUX_INDEX; - j++) { - if (merge_info[j].child_status - != FTS_CHILD_EXITING) { - all_exit = false; - os_thread_sleep(1000); - break; - } - } - trial_count++; - } - - if (!all_exit) { - ib::error() << "Not all child merge" - " threads exited when creating" - " FTS index '" - << indexes[i]->name << "'"; - } else { - for (j = 0; j < FTS_NUM_AUX_INDEX; - j++) { - - os_thread_join(merge_info[j] - .thread_hdl); - } + merge_info[j].task->wait(); + delete merge_info[j].task; } } else { /* This cannot report duplicates; an diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index da11fa2f948..c6a61ff4bf7 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -3435,7 +3435,7 @@ row_drop_table_for_mysql( dict_stats_recalc_pool_del(table); dict_stats_defrag_pool_del(table, NULL); - if (btr_defragment_thread_active) { + if (btr_defragment_active) { /* During fts_drop_orphaned_tables() in recv_recovery_rollback_active() the btr_defragment_mutex has not yet been diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc index 4dee8de5aad..41731ed17a0 100644 --- a/storage/innobase/row/row0purge.cc +++ b/storage/innobase/row/row0purge.cc @@ -1311,26 +1311,6 @@ row_purge_step( node->start(); -#ifdef UNIV_DEBUG - srv_slot_t *slot = thr->thread_slot; - ut_ad(slot); - - rw_lock_x_lock(&slot->debug_sync_lock); - while (UT_LIST_GET_LEN(slot->debug_sync)) { - srv_slot_t::debug_sync_t *sync = - UT_LIST_GET_FIRST(slot->debug_sync); - const char* sync_str = reinterpret_cast<char*>(&sync[1]); - bool result = debug_sync_set_action(current_thd, - sync_str, - strlen(sync_str)); - ut_a(!result); - - UT_LIST_REMOVE(slot->debug_sync, sync); - ut_free(sync); - } - rw_lock_x_unlock(&slot->debug_sync_lock); -#endif - if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) { trx_purge_rec_t*purge_rec; |