diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2022-08-18 17:39:46 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2022-08-18 17:39:46 +0300 |
commit | 8b9043c6220b67a342ad4f5b8c70975e04410f91 (patch) | |
tree | df780bcfe7ee97a4e45fed05b43e23ebec1e68f5 | |
parent | b4fb14f5931b1bd97e811e01336391bba891b0ad (diff) | |
download | mariadb-git-bb-10.8-MDEV-29043.tar.gz |
MDEV-29043 mariabackup --compress hangsbb-10.8-MDEV-29043
compress_write(): Fix a race condition between threads that would
use the same worker thread object. Make thd->data_avail contain the
thread identifier of the submitter, and add thd->avail_cond to
notify other compress_write() threads that are waiting for a slot.
-rw-r--r-- | extra/mariabackup/ds_compress.cc | 51 |
1 files changed, 43 insertions, 8 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc index 64f342df462..f7a9b7a1fbd 100644 --- a/extra/mariabackup/ds_compress.cc +++ b/extra/mariabackup/ds_compress.cc @@ -34,9 +34,10 @@ typedef struct { pthread_t id; uint num; pthread_mutex_t data_mutex; + pthread_cond_t avail_cond; pthread_cond_t data_cond; pthread_cond_t done_cond; - my_bool data_avail; + pthread_t data_avail; my_bool cancelled; const char *from; size_t from_len; @@ -195,9 +196,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) threads = comp_ctxt->threads; nthreads = comp_ctxt->nthreads; + const pthread_t self = pthread_self(); + ptr = (const char *) buf; while (len > 0) { - uint max_thread; + bool wait = nthreads == 1; +retry: + bool submitted = false; /* Send data to worker threads for compression */ for (i = 0; i < nthreads; i++) { @@ -206,16 +211,33 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) thd = threads + i; pthread_mutex_lock(&thd->data_mutex); + if (thd->data_avail == pthread_t(~0UL)) { + } else if (!wait) { +skip: + pthread_mutex_unlock(&thd->data_mutex); + continue; + } else { + for (;;) { + pthread_cond_wait(&thd->avail_cond, + &thd->data_mutex); + if (thd->data_avail + == pthread_t(~0UL)) { + break; + } + goto skip; + } + } chunk_len = (len > COMPRESS_CHUNK_SIZE) ? COMPRESS_CHUNK_SIZE : len; thd->from = ptr; thd->from_len = chunk_len; - thd->data_avail = TRUE; + thd->data_avail = self; pthread_cond_signal(&thd->data_cond); pthread_mutex_unlock(&thd->data_mutex); + submitted = true; len -= chunk_len; if (len == 0) { break; @@ -223,13 +245,20 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) ptr += chunk_len; } - max_thread = (i < nthreads) ? i : nthreads - 1; + if (!submitted) { + wait = true; + goto retry; + } - /* Reap and stream the compressed data */ - for (i = 0; i <= max_thread; i++) { + for (i = 0; i < nthreads; i++) { thd = threads + i; pthread_mutex_lock(&thd->data_mutex); + if (thd->data_avail != self) { + pthread_mutex_unlock(&thd->data_mutex); + continue; + } + while (!thd->to_len) { pthread_cond_wait(&thd->done_cond, &thd->data_mutex); @@ -247,6 +276,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) } thd->to_len = 0; + thd->data_avail = pthread_t(~0UL); + pthread_cond_signal(&thd->avail_cond); pthread_mutex_unlock(&thd->data_mutex); if (fail) { @@ -334,6 +365,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd) pthread_join(thd->id, NULL); + pthread_cond_destroy(&thd->avail_cond); pthread_cond_destroy(&thd->data_cond); pthread_cond_destroy(&thd->done_cond); pthread_mutex_destroy(&thd->data_mutex); @@ -364,11 +396,14 @@ create_worker_threads(uint n) /* Initialize and data mutex and condition var */ if (pthread_mutex_init(&thd->data_mutex, NULL) || + pthread_cond_init(&thd->avail_cond, NULL) || pthread_cond_init(&thd->data_cond, NULL) || pthread_cond_init(&thd->done_cond, NULL)) { goto err; } + thd->data_avail = pthread_t(~0UL); + if (pthread_create(&thd->id, NULL, compress_worker_thread_func, thd)) { msg("compress: pthread_create() failed: " @@ -410,13 +445,13 @@ compress_worker_thread_func(void *arg) pthread_mutex_lock(&thd->data_mutex); while (1) { - while (!thd->data_avail && !thd->cancelled) { + while (!thd->cancelled + && (thd->to_len || thd->data_avail == pthread_t(~0UL))) { pthread_cond_wait(&thd->data_cond, &thd->data_mutex); } if (thd->cancelled) break; - thd->data_avail = FALSE; thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len, &thd->state); |