diff options
author | Nikita Malyavin <nikitamalyavin@gmail.com> | 2021-08-22 05:51:01 +0300 |
---|---|---|
committer | Nikita Malyavin <nikitamalyavin@gmail.com> | 2021-08-27 12:20:02 +1000 |
commit | 47bd795b35c0e6b259dc16521966232cbdd7c190 (patch) | |
tree | fb223111cf12b162720c35909af052d75fbb6a7d | |
parent | 98ab2af187c79f8a05421b2e252583dfc8850265 (diff) | |
download | mariadb-git-MDEV-24676_cpp.tar.gz |
fix wraparound race conditionMDEV-24676_cpp
also improve locking granularity regarding flush
remove atomic on bool, since it is already protected under lock
add more volatile specs
add comments, improve structure packing
-rw-r--r-- | io_cache_test/io_cache_test.cpp | 4 | ||||
-rw-r--r-- | io_cache_test/ring_buffer.hpp | 111 |
2 files changed, 76 insertions, 39 deletions
diff --git a/io_cache_test/io_cache_test.cpp b/io_cache_test/io_cache_test.cpp index 947baa6cc47..5bb8dd0f1db 100644 --- a/io_cache_test/io_cache_test.cpp +++ b/io_cache_test/io_cache_test.cpp @@ -18,6 +18,8 @@ void *read_to_cache(void *) { } void *write_to_cache(void *args) { + struct st_my_thread_var thdvar; + set_mysys_var(&thdvar); pthread_barrier_wait(&barrier); int *v_args= (int *) args; for (int i= v_args[0]; i < v_args[1]; ++i) @@ -90,4 +92,4 @@ int main() { of.open("test_out.txt", std::ios_base::out); of << buff_to; return 0; -}
\ No newline at end of file +} diff --git a/io_cache_test/ring_buffer.hpp b/io_cache_test/ring_buffer.hpp index 9566c6132c5..7ca8fc313ec 100644 --- a/io_cache_test/ring_buffer.hpp +++ b/io_cache_test/ring_buffer.hpp @@ -3,6 +3,7 @@ #include <mysys_priv.h> #include <semaphore.h> #include <atomic> +#include <array> class RingBuffer { public: @@ -14,15 +15,15 @@ public: private: struct cache_slot_t { - std::atomic<bool> vacant{true}; + volatile bool vacant= true; - volatile bool finished = false; + std::atomic<bool> finished {false}; volatile int next = -1; uchar* pos_write_first = nullptr; - /* For wrapping case in curricular write buffer*/ + /** For wrapping case in curricular write buffer */ uchar* pos_write_second = nullptr; uchar* pos_end = nullptr; @@ -30,17 +31,27 @@ private: volatile size_t count_first = 0; volatile size_t count_second = 0; + + /** + Each time the buffer wraps, its version increases. Then it's compared + with slot version to avoid the race when the slot was cleared by other + thread and then re-occupied by a new writer (i.e. vacant == false) + in the same place (i.e. write_pos and lenth are the same) + */ + volatile longlong wrap_version; }; static const int count_thread_for_slots = 4; cache_slot_t _slots[count_thread_for_slots]; - /* Semaphore for predict overflow */ - sem_t semaphore; + /** Last used slot */ + volatile int last_slot= -1; - /* Last used slot */ - int last_slot = -1; + volatile longlong version= 1; + + /** This semaphore prevents slots overflow */ + sem_t semaphore; mysql_rwlock_t flush_rw_lock; @@ -72,6 +83,8 @@ private: /* Maximum of the actual end of file and the position represented by read_end. + + Is protected by flush_rw_lock */ my_off_t _end_of_file; @@ -112,7 +125,7 @@ private: RingBuffer::RingBuffer(char* filename, size_t cachesize) { _total_size = 0; - sem_init(&semaphore, 0, count_thread_for_slots); + sem_init(&semaphore, 0, count_thread_for_slots-1); mysql_rwlock_init(0, &flush_rw_lock); _file = my_open(filename,O_CREAT | O_RDWR,MYF(MY_WME)); if (_file >= 0) { @@ -162,6 +175,7 @@ RingBuffer::RingBuffer(char* filename, size_t cachesize) { mysql_mutex_init(key_IO_CACHE_append_buffer_lock, &_buffer_lock, MY_MUTEX_INIT_FAST); } + RingBuffer::~RingBuffer() { sem_destroy(&semaphore); mysql_rwlock_destroy(&flush_rw_lock); @@ -178,26 +192,62 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) { sem_wait(&semaphore); int i; mysql_mutex_lock(&_buffer_lock); - for (i = 0; i < count_thread_for_slots; ++i) { + for (i= 0; i < count_thread_for_slots; ++i) + { auto &vacant= _slots[i].vacant; - if(vacant.load(std::memory_order_relaxed) - && vacant.exchange(false, std::memory_order_acquire)) + if (vacant) + { + vacant= false; + _slots[i].wrap_version= version; break; + } } + assert(i != count_thread_for_slots); if(Count > (_buffer_length - _total_size)) { + /* + Buffer is full, flush to disk. + 1. Wait for all writes finished by wlock(flush_rw_lock) + 2. Re-initialize slots and increase version + 3. Unlock buffer_lock to allow other writers exit (or else they'll wait + in release() until flushing is done). + 4. Fill out the rest of buffer under exclusive lock and write to file. + */ + + mysql_rwlock_wrlock(&flush_rw_lock); + + uchar *save_write_new_pos= _write_new_pos; + + DBUG_ASSERT(_append_read_pos) + _write_new_pos = _write_buffer; + _write_pos= _write_buffer; + _total_size = 0; + + for (int j = 0; j < count_thread_for_slots; j++) { + if(j == i) + continue; + if(!_slots[j].vacant) + sem_post(&semaphore); + _slots[j].finished= false; + _slots[j].vacant= true; + _slots[j].next= -1; + _slots[j].pos_write_first= nullptr; + _slots[j].pos_write_second= nullptr; + _slots[j].pos_end= nullptr; + } + last_slot = -1; - if(_write_new_pos < _append_read_pos) { - size_t rest_length = _append_read_pos - _write_new_pos; - memcpy(_write_new_pos, From, rest_length); + if(save_write_new_pos < _append_read_pos) { + size_t rest_length = _append_read_pos - save_write_new_pos; + memcpy(save_write_new_pos, From, rest_length); _total_size += rest_length; Count -= rest_length; From += rest_length; - _write_pos = _write_new_pos + rest_length; + _write_pos = save_write_new_pos + rest_length; } else { - size_t rest_length_to_right_border = _write_end - _write_new_pos; - memcpy(_write_new_pos, From, rest_length_to_right_border); + size_t rest_length_to_right_border = _write_end - save_write_new_pos; + memcpy(save_write_new_pos, From, rest_length_to_right_border); _total_size += rest_length_to_right_border; Count -= rest_length_to_right_border; From += rest_length_to_right_border; @@ -210,8 +260,8 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) { From += rest_length_to_read_border; _write_pos = _write_buffer + rest_length_to_read_border; } - mysql_rwlock_wrlock(&flush_rw_lock); _flush_io_buffer(i); + _append_read_pos= _write_buffer; mysql_rwlock_unlock(&flush_rw_lock); } @@ -231,6 +281,7 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) { rest_length_to_right_border = _write_end - _write_new_pos; if(Count > rest_length_to_right_border) { + version++; _slots[i].count_first = rest_length_to_right_border; _slots[i].pos_write_second = _write_buffer; _slots[i].count_second = Count - rest_length_to_right_border; @@ -249,10 +300,13 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) { bool RingBuffer::_slot_release(int slot_id) { + auto version_here= _slots[slot_id].wrap_version; _slots[slot_id].finished = true; mysql_rwlock_unlock(&flush_rw_lock); + DEBUG_SYNC(nullptr, "slot_release"); mysql_mutex_lock(&_buffer_lock); - if(last_slot != -1 && _write_pos == _slots[slot_id].pos_write_first) { + if (last_slot != -1 && version_here == _slots[slot_id].wrap_version + && _write_pos == _slots[slot_id].pos_write_first) { do { _write_pos = _slots[slot_id].pos_end; @@ -312,27 +366,8 @@ int RingBuffer::_flush_io_buffer(int not_released) { } _end_of_file+= _total_size; - _write_new_pos = _append_read_pos= _write_buffer; - DBUG_ASSERT(_end_of_file == mysql_file_tell(_file, MYF(0))); - _write_pos= _write_buffer; - _total_size = 0; - - for (int i = 0; i < count_thread_for_slots; i++) { - if(i == not_released) - continue; - if(!_slots[i].vacant) - sem_post(&semaphore); - _slots[i].finished= false; - _slots[i].vacant= true; - _slots[i].next= -1; - _slots[i].pos_write_first= nullptr; - _slots[i].pos_write_second= nullptr; - _slots[i].pos_end= nullptr; - } - last_slot = -1; - return _error; } |