diff options
Diffstat (limited to 'storage/innobase/include/sync0rw.ic')
-rw-r--r-- | storage/innobase/include/sync0rw.ic | 368 |
1 files changed, 197 insertions, 171 deletions
diff --git a/storage/innobase/include/sync0rw.ic b/storage/innobase/include/sync0rw.ic index eea639f26f4..75b696e3915 100644 --- a/storage/innobase/include/sync0rw.ic +++ b/storage/innobase/include/sync0rw.ic @@ -62,40 +62,48 @@ rw_lock_set_waiters( { lock->waiters = flag; } + +/********************************************************************** +Returns the write-status of the lock - this function made more sense +with the old rw_lock implementation. + */ UNIV_INLINE ulint rw_lock_get_writer( /*===============*/ rw_lock_t* lock) { - return(lock->writer); -} -UNIV_INLINE -void -rw_lock_set_writer( -/*===============*/ - rw_lock_t* lock, - ulint flag) -{ - lock->writer = flag; + lint lock_word = lock->lock_word; + if(lock_word > 0) { + /* return NOT_LOCKED in s-lock state, like the writer + member of the old lock implementation. */ + return RW_LOCK_NOT_LOCKED; + } else if (((-lock_word) % X_LOCK_DECR) == 0) { + return RW_LOCK_EX; + } else { + ut_ad(lock_word > -X_LOCK_DECR); + return RW_LOCK_WAIT_EX; + } } + UNIV_INLINE ulint rw_lock_get_reader_count( /*=====================*/ rw_lock_t* lock) { - return(lock->reader_count); -} -UNIV_INLINE -void -rw_lock_set_reader_count( -/*=====================*/ - rw_lock_t* lock, - ulint count) -{ - lock->reader_count = count; + lint lock_word = lock->lock_word; + if(lock_word > 0) { + /* s-locked, no x-waiters */ + return(X_LOCK_DECR - lock_word); + } else if (lock_word < 0 && lock_word > -X_LOCK_DECR) { + /* s-locked, with x-waiters */ + return (ulint)(-lock_word); + } + return 0; } + +#ifndef HAVE_GCC_ATOMIC_BUILTINS UNIV_INLINE mutex_t* rw_lock_get_mutex( @@ -104,6 +112,7 @@ rw_lock_get_mutex( { return(&(lock->mutex)); } +#endif /********************************************************************** Returns the value of writer_count for the lock. Does not reserve the lock @@ -115,7 +124,87 @@ rw_lock_get_x_lock_count( /* out: value of writer_count */ rw_lock_t* lock) /* in: rw-lock */ { - return(lock->writer_count); + lint lock_copy = lock->lock_word; + /* If there is a reader, lock_word is not divisible by X_LOCK_DECR */ + if(lock_copy > 0 || (-lock_copy) % X_LOCK_DECR != 0) { + return 0; + } + return ((-lock_copy) / X_LOCK_DECR) + 1; +} + +/********************************************************************** +Two different implementations for decrementing the lock_word of a rw_lock: +one for systems supporting atomic operations, one for others. This does +does not support recusive x-locks: they should be handled by the caller and +need not be atomic since they are performed by the current lock holder. +Returns true if the decrement was made, false if not. */ +UNIV_INLINE +ibool +rw_lock_lock_word_decr( + /* out: TRUE if decr occurs */ + rw_lock_t* lock, /* in: rw-lock */ + ulint amount) /* in: amount of decrement */ +{ + +#ifdef HAVE_GCC_ATOMIC_BUILTINS + + lint local_lock_word = lock->lock_word; + while (local_lock_word > 0) { + if(os_compare_and_swap(&(lock->lock_word), + local_lock_word, + local_lock_word - amount)) { + return TRUE; + } + local_lock_word = lock->lock_word; + } + return(FALSE); + +#else /* HAVE_GCC_ATOMIC_BUILTINS */ + + ibool success = FALSE; + mutex_enter(&(lock->mutex)); + if(lock->lock_word > 0) { + lock->lock_word -= amount; + success = TRUE; + } + mutex_exit(&(lock->mutex)); + return success; + +#endif /* HAVE_GCC_ATOMIC_BUILTINS */ + +} + +/********************************************************************** +Two different implementations for incrementing the lock_word of a rw_lock: +one for systems supporting atomic operations, one for others. +Returns the value of lock_word after increment. */ +UNIV_INLINE +lint +rw_lock_lock_word_incr( + /* out: lock->lock_word after increment */ + rw_lock_t* lock, /* in: rw-lock */ + ulint amount) /* in: amount of increment */ +{ + +#ifdef HAVE_GCC_ATOMIC_BUILTINS + + return(os_atomic_increment(&(lock->lock_word), amount)); + +#else /* HAVE_GCC_ATOMIC_BUILTINS */ + + lint local_lock_word; + + mutex_enter(&(lock->mutex)); + + lock->lock_word += amount; + local_lock_word = lock->lock_word; + + mutex_exit(&(lock->mutex)); + + return local_lock_word; + +#endif /* HAVE_GCC_ATOMIC_BUILTINS */ + } /********************************************************************** @@ -133,25 +222,21 @@ rw_lock_s_lock_low( const char* file_name, /* in: file name where lock requested */ ulint line) /* in: line where requested */ { - ut_ad(mutex_own(rw_lock_get_mutex(lock))); - - /* Check if the writer field is free */ - - if (UNIV_LIKELY(lock->writer == RW_LOCK_NOT_LOCKED)) { - /* Set the shared lock by incrementing the reader count */ - lock->reader_count++; + /* TODO: study performance of UNIV_LIKELY branch prediction hints. */ + if (!rw_lock_lock_word_decr(lock, 1)) { + /* Locking did not succeed */ + return(FALSE); + } #ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name, - line); + rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name, line); #endif - lock->last_s_file_name = file_name; - lock->last_s_line = line; - - return(TRUE); /* locking succeeded */ - } + /* These debugging values are not set safely: they may be incorrect + or even refer to a line that is invalid for the file name. */ + lock->last_s_file_name = file_name; + lock->last_s_line = line; - return(FALSE); /* locking did not succeed */ + return(TRUE); /* locking succeeded */ } /********************************************************************** @@ -166,11 +251,10 @@ rw_lock_s_lock_direct( const char* file_name, /* in: file name where requested */ ulint line) /* in: line where lock requested */ { - ut_ad(lock->writer == RW_LOCK_NOT_LOCKED); - ut_ad(rw_lock_get_reader_count(lock) == 0); + ut_ad(lock->lock_word == X_LOCK_DECR); - /* Set the shared lock by incrementing the reader count */ - lock->reader_count++; + /* Indicate there is a new reader by decrementing lock_word */ + lock->lock_word--; lock->last_s_file_name = file_name; lock->last_s_line = line; @@ -193,12 +277,10 @@ rw_lock_x_lock_direct( ulint line) /* in: line where lock requested */ { ut_ad(rw_lock_validate(lock)); - ut_ad(rw_lock_get_reader_count(lock) == 0); - ut_ad(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED); + ut_ad(lock->lock_word == X_LOCK_DECR); - rw_lock_set_writer(lock, RW_LOCK_EX); + lock->lock_word -= X_LOCK_DECR; lock->writer_thread = os_thread_get_curr_id(); - lock->writer_count++; lock->pass = 0; lock->last_x_file_name = file_name; @@ -240,15 +322,12 @@ rw_lock_s_lock_func( ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED)); /* see NOTE above */ #endif /* UNIV_SYNC_DEBUG */ - mutex_enter(rw_lock_get_mutex(lock)); - - if (UNIV_LIKELY(rw_lock_s_lock_low(lock, pass, file_name, line))) { - mutex_exit(rw_lock_get_mutex(lock)); + /* TODO: study performance of UNIV_LIKELY branch prediction hints. */ + if (rw_lock_s_lock_low(lock, pass, file_name, line)) { return; /* Success */ } else { /* Did not succeed, try spin wait */ - mutex_exit(rw_lock_get_mutex(lock)); rw_lock_s_lock_spin(lock, pass, file_name, line); @@ -258,86 +337,66 @@ rw_lock_s_lock_func( /********************************************************************** NOTE! Use the corresponding macro, not directly this function! Lock an -rw-lock in shared mode for the current thread if the lock can be acquired -immediately. */ +rw-lock in exclusive mode for the current thread if the lock can be +obtained immediately. */ UNIV_INLINE ibool -rw_lock_s_lock_func_nowait( +rw_lock_x_lock_func_nowait( /*=======================*/ /* out: TRUE if success */ rw_lock_t* lock, /* in: pointer to rw-lock */ const char* file_name,/* in: file name where lock requested */ ulint line) /* in: line where requested */ { - ibool success = FALSE; - - mutex_enter(rw_lock_get_mutex(lock)); + os_thread_id_t curr_thread = os_thread_get_curr_id(); - if (lock->writer == RW_LOCK_NOT_LOCKED) { - /* Set the shared lock by incrementing the reader count */ - lock->reader_count++; + ibool success; -#ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, 0, RW_LOCK_SHARED, file_name, - line); -#endif - - lock->last_s_file_name = file_name; - lock->last_s_line = line; +#ifdef HAVE_GCC_ATOMIC_BUILTINS + success = os_compare_and_swap(&(lock->lock_word), X_LOCK_DECR, 0); +#else + success = FALSE; + mutex_enter(&(lock->mutex)); + if(lock->lock_word == X_LOCK_DECR) { + lock->lock_word = 0; success = TRUE; } + mutex_exit(&(lock->mutex)); - mutex_exit(rw_lock_get_mutex(lock)); - - return(success); -} - -/********************************************************************** -NOTE! Use the corresponding macro, not directly this function! Lock an -rw-lock in exclusive mode for the current thread if the lock can be -obtained immediately. */ -UNIV_INLINE -ibool -rw_lock_x_lock_func_nowait( -/*=======================*/ - /* out: TRUE if success */ - rw_lock_t* lock, /* in: pointer to rw-lock */ - const char* file_name,/* in: file name where lock requested */ - ulint line) /* in: line where requested */ -{ - ibool success = FALSE; - os_thread_id_t curr_thread = os_thread_get_curr_id(); - mutex_enter(rw_lock_get_mutex(lock)); - - if (UNIV_UNLIKELY(rw_lock_get_reader_count(lock) != 0)) { - } else if (UNIV_LIKELY(rw_lock_get_writer(lock) - == RW_LOCK_NOT_LOCKED)) { - rw_lock_set_writer(lock, RW_LOCK_EX); +#endif + if(success) { lock->writer_thread = curr_thread; lock->pass = 0; -relock: - lock->writer_count++; -#ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line); -#endif + } else if (!(lock->pass) && + os_thread_eq(lock->writer_thread, curr_thread)) { + /* Must verify pass first: otherwise another thread can + call move_ownership suddenly allowing recursive locks. + and after we have verified our thread_id matches + (though move_ownership has since changed it).*/ - lock->last_x_file_name = file_name; - lock->last_x_line = line; + /* Relock: this lock_word modification is safe since no other + threads can modify (lock, unlock, or reserve) lock_word while + there is an exclusive writer and this is the writer thread. */ + lock->lock_word -= X_LOCK_DECR; - success = TRUE; - } else if (rw_lock_get_writer(lock) == RW_LOCK_EX - && lock->pass == 0 - && os_thread_eq(lock->writer_thread, curr_thread)) { - goto relock; + ut_ad(((-lock->lock_word) % X_LOCK_DECR) == 0); + + } else { + /* Failure */ + return(FALSE); } +#ifdef UNIV_SYNC_DEBUG + rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line); +#endif - mutex_exit(rw_lock_get_mutex(lock)); + lock->last_x_file_name = file_name; + lock->last_x_line = line; ut_ad(rw_lock_validate(lock)); - return(success); + return(TRUE); } /********************************************************************** @@ -353,39 +412,21 @@ rw_lock_s_unlock_func( #endif ) { - mutex_t* mutex = &(lock->mutex); - ibool sg = FALSE; - - /* Acquire the mutex protecting the rw-lock fields */ - mutex_enter(mutex); - - /* Reset the shared lock by decrementing the reader count */ - - ut_a(lock->reader_count > 0); - lock->reader_count--; + ut_ad((lock->lock_word % X_LOCK_DECR) != 0); #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, pass, RW_LOCK_SHARED); #endif - /* If there may be waiters and this was the last s-lock, - signal the object */ - - if (UNIV_UNLIKELY(lock->waiters) - && lock->reader_count == 0) { - sg = TRUE; + /* Increment lock_word to indicate 1 less reader */ + if(rw_lock_lock_word_incr(lock, 1) == 0) { - rw_lock_set_waiters(lock, 0); - } - - mutex_exit(mutex); - - if (UNIV_UNLIKELY(sg)) { -#ifdef __WIN__ + /* wait_ex waiter exists. It may not be asleep, but we signal + anyway. We do not wake other waiters, because they can't + exist without wait_ex waiter and wait_ex waiter goes first.*/ os_event_set(lock->wait_ex_event); -#endif - os_event_set(lock->event); sync_array_object_signalled(sync_primary_wait_array); + } ut_ad(rw_lock_validate(lock)); @@ -404,16 +445,15 @@ rw_lock_s_unlock_direct( /*====================*/ rw_lock_t* lock) /* in: rw-lock */ { - /* Reset the shared lock by decrementing the reader count */ - - ut_ad(lock->reader_count > 0); - - lock->reader_count--; + ut_ad(lock->lock_word < X_LOCK_DECR); #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, 0, RW_LOCK_SHARED); #endif + /* Decrease reader count by incrementing lock_word */ + lock->lock_word++; + ut_ad(!lock->waiters); ut_ad(rw_lock_validate(lock)); #ifdef UNIV_SYNC_PERF_STAT @@ -434,42 +474,32 @@ rw_lock_x_unlock_func( #endif ) { - ibool sg = FALSE; + ut_ad((lock->lock_word % X_LOCK_DECR) == 0); - /* Acquire the mutex protecting the rw-lock fields */ - mutex_enter(&(lock->mutex)); - - /* Reset the exclusive lock if this thread no longer has an x-mode - lock */ - - ut_ad(lock->writer_count > 0); - - lock->writer_count--; - - if (lock->writer_count == 0) { - rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED); - } + /* Must reset writer_thread while we still have the lock. + If we are not the last unlocker, we correct it later in the function, + which is harmless since we still hold the lock. */ + /* TODO: are there any risks of a thread id == -1 on any platform? */ + os_thread_id_t local_writer_thread = lock->writer_thread; + lock->writer_thread = -1; #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX); #endif - /* If there may be waiters, signal the lock */ - if (UNIV_UNLIKELY(lock->waiters) - && lock->writer_count == 0) { - - sg = TRUE; - rw_lock_set_waiters(lock, 0); - } - - mutex_exit(&(lock->mutex)); + if(rw_lock_lock_word_incr(lock, X_LOCK_DECR) == X_LOCK_DECR) { + /* Lock is now free. May have to signal read/write waiters. + We do not need to signal wait_ex waiters, since they cannot + exist when there is a writer. */ + if(lock->waiters) { + rw_lock_set_waiters(lock, 0); + os_event_set(lock->event); + sync_array_object_signalled(sync_primary_wait_array); + } - if (UNIV_UNLIKELY(sg)) { -#ifdef __WIN__ - os_event_set(lock->wait_ex_event); -#endif - os_event_set(lock->event); - sync_array_object_signalled(sync_primary_wait_array); + } else { + /* We still hold x-lock, so we correct writer_thread. */ + lock->writer_thread = local_writer_thread; } ut_ad(rw_lock_validate(lock)); @@ -491,18 +521,14 @@ rw_lock_x_unlock_direct( /* Reset the exclusive lock if this thread no longer has an x-mode lock */ - ut_ad(lock->writer_count > 0); - - lock->writer_count--; - - if (lock->writer_count == 0) { - rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED); - } + ut_ad((lock->lock_word % X_LOCK_DECR) == 0); #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX); #endif + lock->lock_word += X_LOCK_DECR; + ut_ad(!lock->waiters); ut_ad(rw_lock_validate(lock)); |