summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2022-08-18 17:39:46 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2022-08-18 17:39:46 +0300
commit8b9043c6220b67a342ad4f5b8c70975e04410f91 (patch)
treedf780bcfe7ee97a4e45fed05b43e23ebec1e68f5
parentb4fb14f5931b1bd97e811e01336391bba891b0ad (diff)
downloadmariadb-git-bb-10.8-MDEV-29043.tar.gz
MDEV-29043 mariabackup --compress hangsbb-10.8-MDEV-29043
compress_write(): Fix a race condition between threads that would use the same worker thread object. Make thd->data_avail contain the thread identifier of the submitter, and add thd->avail_cond to notify other compress_write() threads that are waiting for a slot.
-rw-r--r--extra/mariabackup/ds_compress.cc51
1 files changed, 43 insertions, 8 deletions
diff --git a/extra/mariabackup/ds_compress.cc b/extra/mariabackup/ds_compress.cc
index 64f342df462..f7a9b7a1fbd 100644
--- a/extra/mariabackup/ds_compress.cc
+++ b/extra/mariabackup/ds_compress.cc
@@ -34,9 +34,10 @@ typedef struct {
pthread_t id;
uint num;
pthread_mutex_t data_mutex;
+ pthread_cond_t avail_cond;
pthread_cond_t data_cond;
pthread_cond_t done_cond;
- my_bool data_avail;
+ pthread_t data_avail;
my_bool cancelled;
const char *from;
size_t from_len;
@@ -195,9 +196,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
threads = comp_ctxt->threads;
nthreads = comp_ctxt->nthreads;
+ const pthread_t self = pthread_self();
+
ptr = (const char *) buf;
while (len > 0) {
- uint max_thread;
+ bool wait = nthreads == 1;
+retry:
+ bool submitted = false;
/* Send data to worker threads for compression */
for (i = 0; i < nthreads; i++) {
@@ -206,16 +211,33 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
+ if (thd->data_avail == pthread_t(~0UL)) {
+ } else if (!wait) {
+skip:
+ pthread_mutex_unlock(&thd->data_mutex);
+ continue;
+ } else {
+ for (;;) {
+ pthread_cond_wait(&thd->avail_cond,
+ &thd->data_mutex);
+ if (thd->data_avail
+ == pthread_t(~0UL)) {
+ break;
+ }
+ goto skip;
+ }
+ }
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;
- thd->data_avail = TRUE;
+ thd->data_avail = self;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);
+ submitted = true;
len -= chunk_len;
if (len == 0) {
break;
@@ -223,13 +245,20 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
ptr += chunk_len;
}
- max_thread = (i < nthreads) ? i : nthreads - 1;
+ if (!submitted) {
+ wait = true;
+ goto retry;
+ }
- /* Reap and stream the compressed data */
- for (i = 0; i <= max_thread; i++) {
+ for (i = 0; i < nthreads; i++) {
thd = threads + i;
pthread_mutex_lock(&thd->data_mutex);
+ if (thd->data_avail != self) {
+ pthread_mutex_unlock(&thd->data_mutex);
+ continue;
+ }
+
while (!thd->to_len) {
pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
@@ -247,6 +276,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
}
thd->to_len = 0;
+ thd->data_avail = pthread_t(~0UL);
+ pthread_cond_signal(&thd->avail_cond);
pthread_mutex_unlock(&thd->data_mutex);
if (fail) {
@@ -334,6 +365,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd)
pthread_join(thd->id, NULL);
+ pthread_cond_destroy(&thd->avail_cond);
pthread_cond_destroy(&thd->data_cond);
pthread_cond_destroy(&thd->done_cond);
pthread_mutex_destroy(&thd->data_mutex);
@@ -364,11 +396,14 @@ create_worker_threads(uint n)
/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
+ pthread_cond_init(&thd->avail_cond, NULL) ||
pthread_cond_init(&thd->data_cond, NULL) ||
pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}
+ thd->data_avail = pthread_t(~0UL);
+
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
@@ -410,13 +445,13 @@ compress_worker_thread_func(void *arg)
pthread_mutex_lock(&thd->data_mutex);
while (1) {
- while (!thd->data_avail && !thd->cancelled) {
+ while (!thd->cancelled
+ && (thd->to_len || thd->data_avail == pthread_t(~0UL))) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}
if (thd->cancelled)
break;
- thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);