summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikita Malyavin <nikitamalyavin@gmail.com>2021-08-22 05:51:01 +0300
committerNikita Malyavin <nikitamalyavin@gmail.com>2021-08-27 12:20:02 +1000
commit47bd795b35c0e6b259dc16521966232cbdd7c190 (patch)
treefb223111cf12b162720c35909af052d75fbb6a7d
parent98ab2af187c79f8a05421b2e252583dfc8850265 (diff)
downloadmariadb-git-MDEV-24676_cpp.tar.gz
fix wraparound race conditionMDEV-24676_cpp
also improve locking granularity regarding flush remove atomic on bool, since it is already protected under lock add more volatile specs add comments, improve structure packing
-rw-r--r--io_cache_test/io_cache_test.cpp4
-rw-r--r--io_cache_test/ring_buffer.hpp111
2 files changed, 76 insertions, 39 deletions
diff --git a/io_cache_test/io_cache_test.cpp b/io_cache_test/io_cache_test.cpp
index 947baa6cc47..5bb8dd0f1db 100644
--- a/io_cache_test/io_cache_test.cpp
+++ b/io_cache_test/io_cache_test.cpp
@@ -18,6 +18,8 @@ void *read_to_cache(void *) {
}
void *write_to_cache(void *args) {
+ struct st_my_thread_var thdvar;
+ set_mysys_var(&thdvar);
pthread_barrier_wait(&barrier);
int *v_args= (int *) args;
for (int i= v_args[0]; i < v_args[1]; ++i)
@@ -90,4 +92,4 @@ int main() {
of.open("test_out.txt", std::ios_base::out);
of << buff_to;
return 0;
-} \ No newline at end of file
+}
diff --git a/io_cache_test/ring_buffer.hpp b/io_cache_test/ring_buffer.hpp
index 9566c6132c5..7ca8fc313ec 100644
--- a/io_cache_test/ring_buffer.hpp
+++ b/io_cache_test/ring_buffer.hpp
@@ -3,6 +3,7 @@
#include <mysys_priv.h>
#include <semaphore.h>
#include <atomic>
+#include <array>
class RingBuffer {
public:
@@ -14,15 +15,15 @@ public:
private:
struct cache_slot_t {
- std::atomic<bool> vacant{true};
+ volatile bool vacant= true;
- volatile bool finished = false;
+ std::atomic<bool> finished {false};
volatile int next = -1;
uchar* pos_write_first = nullptr;
- /* For wrapping case in curricular write buffer*/
+ /** For wrapping case in curricular write buffer */
uchar* pos_write_second = nullptr;
uchar* pos_end = nullptr;
@@ -30,17 +31,27 @@ private:
volatile size_t count_first = 0;
volatile size_t count_second = 0;
+
+ /**
+ Each time the buffer wraps, its version increases. Then it's compared
+ with slot version to avoid the race when the slot was cleared by other
+ thread and then re-occupied by a new writer (i.e. vacant == false)
+ in the same place (i.e. write_pos and lenth are the same)
+ */
+ volatile longlong wrap_version;
};
static const int count_thread_for_slots = 4;
cache_slot_t _slots[count_thread_for_slots];
- /* Semaphore for predict overflow */
- sem_t semaphore;
+ /** Last used slot */
+ volatile int last_slot= -1;
- /* Last used slot */
- int last_slot = -1;
+ volatile longlong version= 1;
+
+ /** This semaphore prevents slots overflow */
+ sem_t semaphore;
mysql_rwlock_t flush_rw_lock;
@@ -72,6 +83,8 @@ private:
/*
Maximum of the actual end of file and
the position represented by read_end.
+
+ Is protected by flush_rw_lock
*/
my_off_t _end_of_file;
@@ -112,7 +125,7 @@ private:
RingBuffer::RingBuffer(char* filename, size_t cachesize) {
_total_size = 0;
- sem_init(&semaphore, 0, count_thread_for_slots);
+ sem_init(&semaphore, 0, count_thread_for_slots-1);
mysql_rwlock_init(0, &flush_rw_lock);
_file = my_open(filename,O_CREAT | O_RDWR,MYF(MY_WME));
if (_file >= 0) {
@@ -162,6 +175,7 @@ RingBuffer::RingBuffer(char* filename, size_t cachesize) {
mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
&_buffer_lock, MY_MUTEX_INIT_FAST);
}
+
RingBuffer::~RingBuffer() {
sem_destroy(&semaphore);
mysql_rwlock_destroy(&flush_rw_lock);
@@ -178,26 +192,62 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
sem_wait(&semaphore);
int i;
mysql_mutex_lock(&_buffer_lock);
- for (i = 0; i < count_thread_for_slots; ++i) {
+ for (i= 0; i < count_thread_for_slots; ++i)
+ {
auto &vacant= _slots[i].vacant;
- if(vacant.load(std::memory_order_relaxed)
- && vacant.exchange(false, std::memory_order_acquire))
+ if (vacant)
+ {
+ vacant= false;
+ _slots[i].wrap_version= version;
break;
+ }
}
+ assert(i != count_thread_for_slots);
if(Count > (_buffer_length - _total_size)) {
+ /*
+ Buffer is full, flush to disk.
+ 1. Wait for all writes finished by wlock(flush_rw_lock)
+ 2. Re-initialize slots and increase version
+ 3. Unlock buffer_lock to allow other writers exit (or else they'll wait
+ in release() until flushing is done).
+ 4. Fill out the rest of buffer under exclusive lock and write to file.
+ */
+
+ mysql_rwlock_wrlock(&flush_rw_lock);
+
+ uchar *save_write_new_pos= _write_new_pos;
+
+ DBUG_ASSERT(_append_read_pos)
+ _write_new_pos = _write_buffer;
+ _write_pos= _write_buffer;
+ _total_size = 0;
+
+ for (int j = 0; j < count_thread_for_slots; j++) {
+ if(j == i)
+ continue;
+ if(!_slots[j].vacant)
+ sem_post(&semaphore);
+ _slots[j].finished= false;
+ _slots[j].vacant= true;
+ _slots[j].next= -1;
+ _slots[j].pos_write_first= nullptr;
+ _slots[j].pos_write_second= nullptr;
+ _slots[j].pos_end= nullptr;
+ }
+ last_slot = -1;
- if(_write_new_pos < _append_read_pos) {
- size_t rest_length = _append_read_pos - _write_new_pos;
- memcpy(_write_new_pos, From, rest_length);
+ if(save_write_new_pos < _append_read_pos) {
+ size_t rest_length = _append_read_pos - save_write_new_pos;
+ memcpy(save_write_new_pos, From, rest_length);
_total_size += rest_length;
Count -= rest_length;
From += rest_length;
- _write_pos = _write_new_pos + rest_length;
+ _write_pos = save_write_new_pos + rest_length;
}
else {
- size_t rest_length_to_right_border = _write_end - _write_new_pos;
- memcpy(_write_new_pos, From, rest_length_to_right_border);
+ size_t rest_length_to_right_border = _write_end - save_write_new_pos;
+ memcpy(save_write_new_pos, From, rest_length_to_right_border);
_total_size += rest_length_to_right_border;
Count -= rest_length_to_right_border;
From += rest_length_to_right_border;
@@ -210,8 +260,8 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
From += rest_length_to_read_border;
_write_pos = _write_buffer + rest_length_to_read_border;
}
- mysql_rwlock_wrlock(&flush_rw_lock);
_flush_io_buffer(i);
+ _append_read_pos= _write_buffer;
mysql_rwlock_unlock(&flush_rw_lock);
}
@@ -231,6 +281,7 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
rest_length_to_right_border = _write_end - _write_new_pos;
if(Count > rest_length_to_right_border) {
+ version++;
_slots[i].count_first = rest_length_to_right_border;
_slots[i].pos_write_second = _write_buffer;
_slots[i].count_second = Count - rest_length_to_right_border;
@@ -249,10 +300,13 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
bool RingBuffer::_slot_release(int slot_id) {
+ auto version_here= _slots[slot_id].wrap_version;
_slots[slot_id].finished = true;
mysql_rwlock_unlock(&flush_rw_lock);
+ DEBUG_SYNC(nullptr, "slot_release");
mysql_mutex_lock(&_buffer_lock);
- if(last_slot != -1 && _write_pos == _slots[slot_id].pos_write_first) {
+ if (last_slot != -1 && version_here == _slots[slot_id].wrap_version
+ && _write_pos == _slots[slot_id].pos_write_first) {
do {
_write_pos = _slots[slot_id].pos_end;
@@ -312,27 +366,8 @@ int RingBuffer::_flush_io_buffer(int not_released) {
}
_end_of_file+= _total_size;
- _write_new_pos = _append_read_pos= _write_buffer;
-
DBUG_ASSERT(_end_of_file == mysql_file_tell(_file, MYF(0)));
- _write_pos= _write_buffer;
- _total_size = 0;
-
- for (int i = 0; i < count_thread_for_slots; i++) {
- if(i == not_released)
- continue;
- if(!_slots[i].vacant)
- sem_post(&semaphore);
- _slots[i].finished= false;
- _slots[i].vacant= true;
- _slots[i].next= -1;
- _slots[i].pos_write_first= nullptr;
- _slots[i].pos_write_second= nullptr;
- _slots[i].pos_end= nullptr;
- }
- last_slot = -1;
-
return _error;
}