summaryrefslogtreecommitdiff
path: root/storage/innobase/row
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2019-10-29 22:37:12 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2019-11-15 18:09:30 +0100
commit5e62b6a5e06eb02cbde1e34e95e26f42d87fce02 (patch)
treee20f8c1d9cd084ae1b8a4d2bd33eec4027d9fa90 /storage/innobase/row
parent00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (diff)
downloadmariadb-git-5e62b6a5e06eb02cbde1e34e95e26f42d87fce02.tar.gz
MDEV-16264 Use threadpool for Innodb background work.
Almost all threads have gone - the "ticking" threads, that sleep a while then do some work) (srv_monitor_thread, srv_error_monitor_thread, srv_master_thread) were replaced with timers. Some timers are periodic, e.g the "master" timer. - The btr_defragment_thread is also replaced by a timer , which reschedules it self when current defragment "item" needs throttling - the buf_resize_thread and buf_dump_threads are substitutes with tasks Ditto with page cleaner workers. - purge workers threads are not tasks as well, and purge cleaner coordinator is a combination of a task and timer. - All AIO is outsourced to tpool, Innodb just calls thread_pool::submit_io() and provides the callback. - The srv_slot_t was removed, and innodb_debug_sync used in purge is currently not working, and needs reimplementation.
Diffstat (limited to 'storage/innobase/row')
-rw-r--r--storage/innobase/row/row0ftsort.cc42
-rw-r--r--storage/innobase/row/row0merge.cc88
-rw-r--r--storage/innobase/row/row0mysql.cc2
-rw-r--r--storage/innobase/row/row0purge.cc20
4 files changed, 20 insertions, 132 deletions
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc
index 45a5fee59ec..ca3de3e2af0 100644
--- a/storage/innobase/row/row0ftsort.cc
+++ b/storage/innobase/row/row0ftsort.cc
@@ -216,7 +216,6 @@ row_fts_psort_info_init(
common_info->trx = trx;
common_info->all_info = psort_info;
common_info->sort_event = os_event_create(0);
- common_info->merge_event = os_event_create(0);
common_info->opt_doc_id_size = opt_doc_id_size;
if (log_tmp_is_encrypted()) {
@@ -350,7 +349,6 @@ row_fts_psort_info_destroy(
}
os_event_destroy(merge_info[0].psort_common->sort_event);
- os_event_destroy(merge_info[0].psort_common->merge_event);
ut_free(merge_info[0].psort_common->dup);
ut_free(merge_info[0].psort_common);
ut_free(psort_info);
@@ -754,10 +752,9 @@ row_merge_fts_get_next_doc_item(
/*********************************************************************//**
Function performs parallel tokenization of the incoming doc strings.
It also performs the initial in memory sort of the parsed records.
-@return OS_THREAD_DUMMY_RETURN */
+*/
static
-os_thread_ret_t
-DECLARE_THREAD(fts_parallel_tokenization)(
+void fts_parallel_tokenization(
/*======================*/
void* arg) /*!< in: psort_info for the thread */
{
@@ -1065,10 +1062,6 @@ func_exit:
psort_info->child_status = FTS_CHILD_COMPLETE;
os_event_set(psort_info->psort_common->sort_event);
psort_info->child_status = FTS_CHILD_EXITING;
-
- os_thread_exit();
-
- OS_THREAD_DUMMY_RETURN;
}
/*********************************************************************//**
@@ -1079,23 +1072,20 @@ row_fts_start_psort(
fts_psort_t* psort_info) /*!< parallel sort structure */
{
ulint i = 0;
- os_thread_id_t thd_id;
for (i = 0; i < fts_sort_pll_degree; i++) {
psort_info[i].psort_id = i;
- psort_info[i].thread_hdl =
- os_thread_create(fts_parallel_tokenization,
- (void*) &psort_info[i],
- &thd_id);
+ psort_info[i].task =
+ new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]);
+ srv_thread_pool->submit_task(psort_info[i].task);
}
}
/*********************************************************************//**
-Function performs the merge and insertion of the sorted records.
-@return OS_THREAD_DUMMY_RETURN */
+Function performs the merge and insertion of the sorted records. */
static
-os_thread_ret_t
-DECLARE_THREAD(fts_parallel_merge)(
+void
+fts_parallel_merge(
/*===============*/
void* arg) /*!< in: parallel merge info */
{
@@ -1109,14 +1099,6 @@ DECLARE_THREAD(fts_parallel_merge)(
row_fts_merge_insert(psort_info->psort_common->dup->index,
psort_info->psort_common->new_table,
psort_info->psort_common->all_info, id);
-
- psort_info->child_status = FTS_CHILD_COMPLETE;
- os_event_set(psort_info->psort_common->merge_event);
- psort_info->child_status = FTS_CHILD_EXITING;
-
- os_thread_exit(false);
-
- OS_THREAD_DUMMY_RETURN;
}
/*********************************************************************//**
@@ -1128,15 +1110,15 @@ row_fts_start_parallel_merge(
{
ulint i = 0;
- /* Kick off merge/insert threads */
+ /* Kick off merge/insert tasks */
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
merge_info[i].psort_id = i;
merge_info[i].child_status = 0;
- merge_info[i].thread_hdl = os_thread_create(
+ merge_info[i].task = new tpool::waitable_task(
fts_parallel_merge,
- (void*) &merge_info[i],
- &merge_info[i].thread_hdl);
+ (void*) &merge_info[i]);
+ srv_thread_pool->submit_task(merge_info[i].task);
}
}
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index dad2e060678..3718d34f7c7 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -2771,10 +2771,6 @@ all_done:
DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
#endif
if (fts_pll_sort) {
- bool all_exit = false;
- ulint trial_count = 0;
- const ulint max_trial_count = 10000;
-
wait_again:
/* Check if error occurs in child thread */
for (ulint j = 0; j < fts_sort_pll_degree; j++) {
@@ -2807,27 +2803,9 @@ wait_again:
}
}
- /* Now all children should complete, wait a bit until
- they all finish setting the event, before we free everything.
- This has a 10 second timeout */
- do {
- all_exit = true;
-
- for (ulint j = 0; j < fts_sort_pll_degree; j++) {
- if (psort_info[j].child_status
- != FTS_CHILD_EXITING) {
- all_exit = false;
- os_thread_sleep(1000);
- break;
- }
- }
- trial_count++;
- } while (!all_exit && trial_count < max_trial_count);
-
- if (!all_exit) {
- ib::fatal() << "Not all child sort threads exited"
- " when creating FTS index '"
- << fts_sort_idx->name << "'";
+ for (ulint j = 0; j < fts_sort_pll_degree; j++) {
+ psort_info[j].task->wait();
+ delete psort_info[j].task;
}
}
@@ -4109,7 +4087,7 @@ row_merge_file_create_low(
File f = create_temp_file(filename, path, "ib",
O_BINARY | O_SEQUENTIAL,
MYF(MY_WME | MY_TEMPORARY));
- pfs_os_file_t fd = IF_WIN(my_get_osfhandle(f), f);
+ pfs_os_file_t fd = IF_WIN((os_file_t)my_get_osfhandle(f), f);
#ifdef UNIV_PFS_IO
register_pfs_file_open_end(locker, fd,
@@ -4155,7 +4133,7 @@ row_merge_file_destroy_low(
const pfs_os_file_t& fd) /*!< in: merge file descriptor */
{
if (fd != OS_FILE_CLOSED) {
- int res = mysql_file_close(IF_WIN(my_win_handle2File(fd), fd),
+ int res = mysql_file_close(IF_WIN(my_win_handle2File((os_file_t)fd), fd),
MYF(MY_WME));
ut_a(res != -1);
}
@@ -4589,7 +4567,6 @@ row_merge_build_indexes(
dict_index_t* fts_sort_idx = NULL;
fts_psort_t* psort_info = NULL;
fts_psort_t* merge_info = NULL;
- int64_t sig_count = 0;
bool fts_psort_initiated = false;
double total_static_cost = 0;
@@ -4756,65 +4733,14 @@ row_merge_build_indexes(
}
if (indexes[i]->type & DICT_FTS) {
- os_event_t fts_parallel_merge_event;
sort_idx = fts_sort_idx;
- fts_parallel_merge_event
- = merge_info[0].psort_common->merge_event;
-
if (FTS_PLL_MERGE) {
- ulint trial_count = 0;
- bool all_exit = false;
-
- os_event_reset(fts_parallel_merge_event);
row_fts_start_parallel_merge(merge_info);
-wait_again:
- os_event_wait_time_low(
- fts_parallel_merge_event, 1000000,
- sig_count);
-
for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
- if (merge_info[j].child_status
- != FTS_CHILD_COMPLETE
- && merge_info[j].child_status
- != FTS_CHILD_EXITING) {
- sig_count = os_event_reset(
- fts_parallel_merge_event);
-
- goto wait_again;
- }
- }
-
- /* Now all children should complete, wait
- a bit until they all finish using event */
- while (!all_exit && trial_count < 10000) {
- all_exit = true;
-
- for (j = 0; j < FTS_NUM_AUX_INDEX;
- j++) {
- if (merge_info[j].child_status
- != FTS_CHILD_EXITING) {
- all_exit = false;
- os_thread_sleep(1000);
- break;
- }
- }
- trial_count++;
- }
-
- if (!all_exit) {
- ib::error() << "Not all child merge"
- " threads exited when creating"
- " FTS index '"
- << indexes[i]->name << "'";
- } else {
- for (j = 0; j < FTS_NUM_AUX_INDEX;
- j++) {
-
- os_thread_join(merge_info[j]
- .thread_hdl);
- }
+ merge_info[j].task->wait();
+ delete merge_info[j].task;
}
} else {
/* This cannot report duplicates; an
diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc
index da11fa2f948..c6a61ff4bf7 100644
--- a/storage/innobase/row/row0mysql.cc
+++ b/storage/innobase/row/row0mysql.cc
@@ -3435,7 +3435,7 @@ row_drop_table_for_mysql(
dict_stats_recalc_pool_del(table);
dict_stats_defrag_pool_del(table, NULL);
- if (btr_defragment_thread_active) {
+ if (btr_defragment_active) {
/* During fts_drop_orphaned_tables() in
recv_recovery_rollback_active() the
btr_defragment_mutex has not yet been
diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc
index 4dee8de5aad..41731ed17a0 100644
--- a/storage/innobase/row/row0purge.cc
+++ b/storage/innobase/row/row0purge.cc
@@ -1311,26 +1311,6 @@ row_purge_step(
node->start();
-#ifdef UNIV_DEBUG
- srv_slot_t *slot = thr->thread_slot;
- ut_ad(slot);
-
- rw_lock_x_lock(&slot->debug_sync_lock);
- while (UT_LIST_GET_LEN(slot->debug_sync)) {
- srv_slot_t::debug_sync_t *sync =
- UT_LIST_GET_FIRST(slot->debug_sync);
- const char* sync_str = reinterpret_cast<char*>(&sync[1]);
- bool result = debug_sync_set_action(current_thd,
- sync_str,
- strlen(sync_str));
- ut_a(!result);
-
- UT_LIST_REMOVE(slot->debug_sync, sync);
- ut_free(sync);
- }
- rw_lock_x_unlock(&slot->debug_sync_lock);
-#endif
-
if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) {
trx_purge_rec_t*purge_rec;