diff options
author | unknown <bell@88-214-96-46.dialup.umc.net.ua> | 2008-02-22 23:21:27 +0200 |
---|---|---|
committer | unknown <bell@88-214-96-46.dialup.umc.net.ua> | 2008-02-22 23:21:27 +0200 |
commit | daacf696e524cfa633fda1c0123d052459da3148 (patch) | |
tree | 07e6152b069fc5a60a3ec0ecd91d05bf7f75eb52 | |
parent | 1dfeb4783f7da1941a08465bf62443fa229833a8 (diff) | |
parent | 190de95f6fcf37572be7cf2ff0543d74d190a989 (diff) | |
download | mariadb-git-daacf696e524cfa633fda1c0123d052459da3148.tar.gz |
Merge 88-214-96-46.dialup.umc.net.ua:/Users/bell/mysql/bk/mysql-maria
into 88-214-96-46.dialup.umc.net.ua:/Users/bell/mysql/bk/work-maria-test
storage/maria/ma_pagecache.c:
Auto merged
storage/maria/unittest/Makefile.am:
merge
storage/maria/unittest/ma_pagecache_consist.c:
merge
-rw-r--r-- | include/my_pthread.h | 4 | ||||
-rw-r--r-- | include/wqueue.h | 1 | ||||
-rw-r--r-- | mysys/wqueue.c | 43 | ||||
-rw-r--r-- | storage/maria/ma_pagecache.c | 288 | ||||
-rw-r--r-- | storage/maria/unittest/Makefile.am | 4 | ||||
-rw-r--r-- | storage/maria/unittest/ma_pagecache_consist.c | 65 | ||||
-rw-r--r-- | storage/maria/unittest/ma_pagecache_rwconsist.c | 348 |
7 files changed, 656 insertions, 97 deletions
diff --git a/include/my_pthread.h b/include/my_pthread.h index 093cd9c000e..1841d91d8c5 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -674,6 +674,9 @@ extern int pthread_dummy(int); #endif #endif +#define MY_PTHREAD_LOCK_READ 0 +#define MY_PTHREAD_LOCK_WRITE 1 + struct st_my_thread_var { int thr_errno; @@ -688,6 +691,7 @@ struct st_my_thread_var my_bool init; struct st_my_thread_var *next,**prev; void *opt_info; + uint lock_type; /* used by conditional release the queue */ #ifndef DBUG_OFF void *dbug; char name[THREAD_NAME_SIZE+1]; diff --git a/include/wqueue.h b/include/wqueue.h index bacabb8c401..658f3d66f12 100644 --- a/include/wqueue.h +++ b/include/wqueue.h @@ -20,6 +20,7 @@ void wqueue_add_and_wait(WQUEUE *wqueue, struct st_my_thread_var *thread, pthread_mutex_t *lock); void wqueue_release_queue(WQUEUE *wqueue); +void wqueue_release_one_locktype_from_queue(WQUEUE *wqueue); #endif diff --git a/mysys/wqueue.c b/mysys/wqueue.c index bfe9cba1235..0766e13a4e4 100644 --- a/mysys/wqueue.c +++ b/mysys/wqueue.c @@ -136,6 +136,49 @@ void wqueue_release_queue(WQUEUE *wqueue) } +/** + @brief Removes all threads waiting for read or first one waiting for write. + + @param wqueue pointer to the queue structure + @apram thread pointer to the thread to be added to the queue +*/ + +void wqueue_release_one_locktype_from_queue(WQUEUE *wqueue) +{ + struct st_my_thread_var *last= wqueue->last_thread; + struct st_my_thread_var *next= last->next; + struct st_my_thread_var **prev= &wqueue->last_thread; + struct st_my_thread_var *thread; + uint first_type= next->lock_type; + if (first_type == MY_PTHREAD_LOCK_WRITE) + { + /* release first waiting for write lock */ + thread= next; + pthread_cond_signal(&thread->suspend); + wqueue->last_thread= next; + thread->next= NULL; + return; + } + do + { + thread= next; + next= thread->next; + if (thread->lock_type == MY_PTHREAD_LOCK_WRITE) + { + /* skip waiting for write lock */ + *prev= thread; + prev= &thread->next; + } + else + { + /* release waiting for read lock */ + pthread_cond_signal(&thread->suspend); + thread->next= NULL; + } + } while (thread != last); + *prev= NULL; +} + /* Add thread and wait diff --git a/storage/maria/ma_pagecache.c b/storage/maria/ma_pagecache.c index 2c2286d6b98..daea156bcf3 100644 --- a/storage/maria/ma_pagecache.c +++ b/storage/maria/ma_pagecache.c @@ -37,6 +37,9 @@ blocks_unused is the sum of never used blocks in the pool and of currently free blocks. blocks_used is the number of blocks fetched from the pool and as such gives the maximum number of in-use blocks at any time. + + TODO: Write operation locks whole cache till the end of the operation. + Should be fixed. */ #include "maria_def.h" @@ -96,7 +99,7 @@ #define PCBLOCK_INFO(B) \ DBUG_PRINT("info", \ ("block: 0x%lx fd: %lu page: %lu s: %0x hshL: 0x%lx req: %u/%u " \ - "wrlocks: %u pins: %u", \ + "wrlocks: %u rdlocks %u rdlocks_q: %u pins: %u", \ (ulong)(B), \ (ulong)((B)->hash_link ? \ (B)->hash_link->file.file : \ @@ -110,7 +113,7 @@ (uint)((B)->hash_link ? \ (B)->hash_link->requests : \ 0), \ - block->wlocks, \ + block->wlocks, block->rlocks, block->rlocks_queue, \ (uint)(B)->pins)) /* TODO: put it to my_static.c */ @@ -298,11 +301,7 @@ struct st_pagecache_block_link #endif KEYCACHE_CONDVAR *condvar; /* condition variable for 'no readers' event */ uchar *buffer; /* buffer for the block page */ -#ifdef THREAD pthread_t write_locker; -#else - int write_locker; -#endif ulonglong last_hit_time; /* timestamp of the last hit */ WQUEUE @@ -311,7 +310,9 @@ struct st_pagecache_block_link uint status; /* state of the block */ uint pins; /* pin counter */ uint wlocks; /* write locks counter */ - enum PCBLOCK_TEMPERATURE temperature; /* block temperature: cold, warm, hot */ + uint rlocks; /* read locks counter */ + uint rlocks_queue; /* rd. locks waiting wr. lock of this thread */ + enum PCBLOCK_TEMPERATURE temperature; /* block temperature: cold, warm, hot*/ enum pagecache_page_type type; /* type of the block */ uint hits_left; /* number of hits left until promotion */ /** @brief LSN when first became dirty; LSN_MAX means "not yet set" */ @@ -1938,6 +1939,8 @@ restart: } pagecache->blocks_unused--; DBUG_ASSERT(block->wlocks == 0); + DBUG_ASSERT(block->rlocks == 0); + DBUG_ASSERT(block->rlocks_queue == 0); DBUG_ASSERT(block->pins == 0); block->status= 0; #ifndef DBUG_OFF @@ -2003,6 +2006,8 @@ restart: } PCBLOCK_INFO(block); DBUG_ASSERT(block->wlocks == 0); + DBUG_ASSERT(block->rlocks == 0); + DBUG_ASSERT(block->rlocks_queue == 0); DBUG_ASSERT(block->pins == 0); if (block->hash_link != hash_link && @@ -2010,6 +2015,8 @@ restart: { /* this is a primary request for a new page */ DBUG_ASSERT(block->wlocks == 0); + DBUG_ASSERT(block->rlocks == 0); + DBUG_ASSERT(block->rlocks_queue == 0); DBUG_ASSERT(block->pins == 0); block->status|= PCBLOCK_IN_SWITCH; @@ -2196,17 +2203,82 @@ static void info_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) #define info_change_lock(B,W) #endif -/* - Put on the block write lock - SYNOPSIS - get_wrlock() - pagecache pointer to a page cache data structure - block the block to work with +/** + @brief waiting for lock for read and write lock - RETURN - 0 - OK - 1 - Can't lock this block, need retry + @parem pagecache pointer to a page cache data structure + @parem block the block to work with + @param file file of the block when it was locked + @param pageno page number of the block when it was locked + @param lock_type MY_PTHREAD_LOCK_READ or MY_PTHREAD_LOCK_WRITE + + @retval 0 OK + @retval 1 Can't lock this block, need retry +*/ + +static my_bool translog_wait_lock(PAGECACHE *pagecache, + PAGECACHE_BLOCK_LINK *block, + PAGECACHE_FILE file, + pgcache_page_no_t pageno, + uint lock_type) +{ + /* Lock failed we will wait */ +#ifdef THREAD + struct st_my_thread_var *thread= my_thread_var; + DBUG_ENTER("translog_wait_lock"); + DBUG_PRINT("info", ("fail to lock, waiting... 0x%lx", (ulong)block)); + thread->lock_type= lock_type; + wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread); + dec_counter_for_resize_op(pagecache); + do + { + KEYCACHE_DBUG_PRINT("get_wrlock: wait", + ("suspend thread %ld", thread->id)); + pagecache_pthread_cond_wait(&thread->suspend, + &pagecache->cache_lock); + } + while(thread->next); +#else + DBUG_ASSERT(0); +#endif + PCBLOCK_INFO(block); + if ((block->status & (PCBLOCK_REASSIGNED | PCBLOCK_IN_SWITCH)) || + file.file != block->hash_link->file.file || + pageno != block->hash_link->pageno) + { + DBUG_PRINT("info", ("the block 0x%lx changed => need retry " + "status: %x files %d != %d or pages %lu != %lu", + (ulong)block, block->status, + file.file, block->hash_link->file.file, + (ulong) pageno, (ulong) block->hash_link->pageno)); + DBUG_RETURN(1); + } + DBUG_RETURN(0); +} + +/** + @brief Put on the block write lock + + @parem pagecache pointer to a page cache data structure + @parem block the block to work with + + @note We have loose scheme for locking by the same thread: + * Downgrade to read lock if no other locks are taken + * Our scheme of locking allow for the same thread + - the same kind of lock + - taking read lock if write lock present + - downgrading to read lock if still other place the same + thread keep write lock + * But unlock operation number should be the same to lock operation. + * If we try to get read lock having active write locks we put read + locks to queue, and as soon as write lock(s) gone the read locks + from queue came in force. + * If read lock is unlocked earlier then it came to force it + just removed from the queue + + @retval 0 OK + @retval 1 Can't lock this block, need retry */ static my_bool get_wrlock(PAGECACHE *pagecache, @@ -2214,11 +2286,7 @@ static my_bool get_wrlock(PAGECACHE *pagecache, { PAGECACHE_FILE file= block->hash_link->file; pgcache_page_no_t pageno= block->hash_link->pageno; -#ifdef THREAD pthread_t locker= pthread_self(); -#else - int locker= 0; -#endif DBUG_ENTER("get_wrlock"); DBUG_PRINT("info", ("the block 0x%lx " "files %d(%d) pages %lu(%lu)", @@ -2226,37 +2294,17 @@ static my_bool get_wrlock(PAGECACHE *pagecache, file.file, block->hash_link->file.file, (ulong) pageno, (ulong) block->hash_link->pageno)); PCBLOCK_INFO(block); - while (block->wlocks && !pthread_equal(block->write_locker, locker)) + /* + We assume that the same thread will try write lock on block on which it + has already read lock. + */ + while ((block->wlocks && !pthread_equal(block->write_locker, locker)) || + block->rlocks) { /* Lock failed we will wait */ -#ifdef THREAD - struct st_my_thread_var *thread= my_thread_var; - DBUG_PRINT("info", ("fail to lock, waiting... 0x%lx", (ulong)block)); - wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread); - dec_counter_for_resize_op(pagecache); - do - { - KEYCACHE_DBUG_PRINT("get_wrlock: wait", - ("suspend thread %ld", thread->id)); - pagecache_pthread_cond_wait(&thread->suspend, - &pagecache->cache_lock); - } - while(thread->next); -#else - DBUG_ASSERT(0); -#endif - PCBLOCK_INFO(block); - if ((block->status & (PCBLOCK_REASSIGNED | PCBLOCK_IN_SWITCH)) || - file.file != block->hash_link->file.file || - pageno != block->hash_link->pageno) - { - DBUG_PRINT("info", ("the block 0x%lx changed => need retry " - "status: %x files %d != %d or pages %lu != %lu", - (ulong)block, block->status, - file.file, block->hash_link->file.file, - (ulong) pageno, (ulong) block->hash_link->pageno)); + if (translog_wait_lock(pagecache, block, file, pageno, + MY_PTHREAD_LOCK_WRITE)) DBUG_RETURN(1); - } } /* we are doing it by global cache mutex protection, so it is OK */ block->wlocks++; @@ -2267,36 +2315,130 @@ static my_bool get_wrlock(PAGECACHE *pagecache, /* - Remove write lock from the block + @brief Put on the block read lock - SYNOPSIS - release_wrlock() - pagecache pointer to a page cache data structure - block the block to work with + @param pagecache pointer to a page cache data structure + @param block the block to work with + @param user_file Unique handler per handler file. Used to check if + we request many write locks withing the same + statement - RETURN - 0 - OK + @note see note for get_wrlock(). + + @retvalue 0 OK + @retvalue 1 Can't lock this block, need retry */ -static void release_wrlock(PAGECACHE_BLOCK_LINK *block) +static my_bool get_rdlock(PAGECACHE *pagecache, + PAGECACHE_BLOCK_LINK *block) +{ + PAGECACHE_FILE file= block->hash_link->file; + pgcache_page_no_t pageno= block->hash_link->pageno; + pthread_t locker= pthread_self(); + DBUG_ENTER("get_rdlock"); + DBUG_PRINT("info", ("the block 0x%lx " + "files %d(%d) pages %lu(%lu)", + (ulong) block, + file.file, block->hash_link->file.file, + (ulong) pageno, (ulong) block->hash_link->pageno)); + PCBLOCK_INFO(block); + while (block->wlocks && !pthread_equal(block->write_locker, locker)) + { + /* Lock failed we will wait */ + if (translog_wait_lock(pagecache, block, file, pageno, + MY_PTHREAD_LOCK_READ)) + DBUG_RETURN(1); + } + /* we are doing it by global cache mutex protection, so it is OK */ + if (block->wlocks) + { + DBUG_ASSERT(pthread_equal(block->write_locker, locker)); + block->rlocks_queue++; + DBUG_PRINT("info", ("RD lock put into queue, block 0x%lx", (ulong)block)); + } + else + { + block->rlocks++; + DBUG_PRINT("info", ("RD lock set, block 0x%lx", (ulong)block)); + } + DBUG_RETURN(0); +} + + +/* + @brief Remove write lock from the block + + @param pagecache pointer to a page cache data structure + @param block the block to work with + @param read_lock downgrade to read lock + + @note see note for get_wrlock(). +*/ + +static void release_wrlock(PAGECACHE_BLOCK_LINK *block, my_bool read_lock) { DBUG_ENTER("release_wrlock"); PCBLOCK_INFO(block); DBUG_ASSERT(block->wlocks > 0); + DBUG_ASSERT(block->rlocks == 0); DBUG_ASSERT(block->pins > 0); + if (read_lock) + block->rlocks_queue++; + if (block->wlocks == 1) + { + block->rlocks= block->rlocks_queue; + block->rlocks_queue= 0; + } block->wlocks--; if (block->wlocks > 0) DBUG_VOID_RETURN; /* Multiple write locked */ DBUG_PRINT("info", ("WR lock reset, block 0x%lx", (ulong)block)); #ifdef THREAD - /* release all threads waiting for write lock */ + /* release all threads waiting for read lock or one waiting for write */ if (block->wqueue[COND_FOR_WRLOCK].last_thread) - wqueue_release_queue(&block->wqueue[COND_FOR_WRLOCK]); + wqueue_release_one_locktype_from_queue(&block->wqueue[COND_FOR_WRLOCK]); #endif PCBLOCK_INFO(block); DBUG_VOID_RETURN; } +/* + @brief Remove read lock from the block + + @param pagecache pointer to a page cache data structure + @param block the block to work with + + @note see note for get_wrlock(). +*/ + +static void release_rdlock(PAGECACHE_BLOCK_LINK *block) +{ + DBUG_ENTER("release_wrlock"); + PCBLOCK_INFO(block); + if (block->wlocks) + { + DBUG_ASSERT(pthread_equal(block->write_locker, pthread_self())); + DBUG_ASSERT(block->rlocks == 0); + DBUG_ASSERT(block->rlocks_queue > 0); + block->rlocks_queue--; + DBUG_PRINT("info", ("RD lock queue decreased, block 0x%lx", (ulong)block)); + DBUG_VOID_RETURN; + } + DBUG_ASSERT(block->rlocks > 0); + DBUG_ASSERT(block->rlocks_queue == 0); + block->rlocks--; + DBUG_PRINT("info", ("RD lock decreased, block 0x%lx", (ulong)block)); + if (block->rlocks > 0) + DBUG_VOID_RETURN; /* Multiple write locked */ + DBUG_PRINT("info", ("RD lock reset, block 0x%lx", (ulong)block)); +#ifdef THREAD + /* release all threads waiting for read lock or one waiting for write */ + if (block->wqueue[COND_FOR_WRLOCK].last_thread) + wqueue_release_one_locktype_from_queue(&block->wqueue[COND_FOR_WRLOCK]); +#endif + PCBLOCK_INFO(block); + DBUG_VOID_RETURN; +} /* Try to lock/unlock and pin/unpin the block @@ -2325,9 +2467,10 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, #ifndef DBUG_OFF if (block) { - DBUG_PRINT("enter", ("block: 0x%lx (%u) wrlocks: %u pins: %u lock: %s pin: %s", + DBUG_PRINT("enter", ("block: 0x%lx (%u) wrlocks: %u rdlocks: %u " + "rdlocks_q: %u pins: %u lock: %s pin: %s", (ulong)block, PCBLOCK_NUMBER(pagecache, block), - block->wlocks, + block->wlocks, block->rlocks, block->rlocks_queue, block->pins, page_cache_page_lock_str[lock], page_cache_page_pin_str[pin])); @@ -2340,7 +2483,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, /* Writelock and pin the buffer */ if (get_wrlock(pagecache, block)) { - /* can't lock => need retry */ + /* Couldn't lock because block changed status => need retry */ goto retry; } @@ -2350,13 +2493,13 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, break; case PAGECACHE_LOCK_WRITE_TO_READ: /* write -> read */ case PAGECACHE_LOCK_WRITE_UNLOCK: /* write -> free */ - /* - Removes write lock and puts read lock (which is nothing in our - implementation) - */ - release_wrlock(block); + /* Removes write lock and puts read lock */ + release_wrlock(block, lock == PAGECACHE_LOCK_WRITE_TO_READ); /* fall through */ case PAGECACHE_LOCK_READ_UNLOCK: /* read -> free */ + if (lock == PAGECACHE_LOCK_READ_UNLOCK) + release_rdlock(block); + /* fall through */ case PAGECACHE_LOCK_LEFT_READLOCKED: /* read -> read */ if (pin == PAGECACHE_UNPIN) { @@ -2373,6 +2516,12 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, } break; case PAGECACHE_LOCK_READ: /* free -> read */ + if (get_rdlock(pagecache, block)) + { + /* Couldn't lock because block changed status => need retry */ + goto retry; + } + if (pin == PAGECACHE_PIN) { /* The cache is locked so nothing afraid off */ @@ -2385,6 +2534,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, { remove_pin(block); } + /* fall through */ case PAGECACHE_LOCK_LEFT_WRITELOCKED: /* write -> write */ break; /* do nothing */ default: @@ -3009,6 +3159,9 @@ restart: test((pin == PAGECACHE_PIN_LEFT_UNPINNED) || (pin == PAGECACHE_PIN)), &page_st); + DBUG_PRINT("info", ("Block type: %s current type %s", + page_cache_page_type_str[block->type], + page_cache_page_type_str[type])); if (((block->status & PCBLOCK_ERROR) == 0) && (page_st != PAGE_READ)) { DBUG_PRINT("info", ("read block 0x%lx", (ulong)block)); @@ -3751,6 +3904,8 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) unlink_changed(block); DBUG_ASSERT(block->wlocks == 0); + DBUG_ASSERT(block->rlocks == 0); + DBUG_ASSERT(block->rlocks_queue == 0); DBUG_ASSERT(block->pins == 0); block->status= 0; #ifndef DBUG_OFF @@ -3857,6 +4012,7 @@ static int flush_cached_blocks(PAGECACHE *pagecache, if (make_lock_and_pin(pagecache, block, PAGECACHE_LOCK_WRITE, PAGECACHE_PIN)) DBUG_ASSERT(0); + DBUG_ASSERT(block->pins == 1); KEYCACHE_DBUG_PRINT("flush_cached_blocks", ("block: %u (0x%lx) to be flushed", diff --git a/storage/maria/unittest/Makefile.am b/storage/maria/unittest/Makefile.am index eda4f5a3d57..e641ae77cb5 100644 --- a/storage/maria/unittest/Makefile.am +++ b/storage/maria/unittest/Makefile.am @@ -39,6 +39,7 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ma_pagecache_consist_64kRD-t \ ma_pagecache_consist_1kWR-t \ ma_pagecache_consist_64kWR-t \ + ma_pagecache_rwconsist_1k-t \ ma_test_loghandler-t \ ma_test_loghandler_multigroup-t \ ma_test_loghandler_multithread-t \ @@ -96,6 +97,9 @@ ma_pagecache_consist_1kWR_t_CPPFLAGS = $(ma_pagecache_common_cppflags) -DTEST_P ma_pagecache_consist_64kWR_t_SOURCES = $(ma_pagecache_consist_src) ma_pagecache_consist_64kWR_t_CPPFLAGS = $(ma_pagecache_common_cppflags) -DTEST_PAGE_SIZE=65536 -DTEST_WRITERS +ma_pagecache_rwconsist_1k_t_SOURCES = ma_pagecache_rwconsist.c +ma_pagecache_rwconsist_1k_t_CPPFLAGS = -DTEST_PAGE_SIZE=1024 + # the generic lock manager may not be used in the end and lockman1-t crashes, # so we don't build lockman-t and lockman1-t CLEANFILES = maria_log_control page_cache_test_file_1 \ diff --git a/storage/maria/unittest/ma_pagecache_consist.c b/storage/maria/unittest/ma_pagecache_consist.c index 3b726b3daea..33c458fd7c7 100644 --- a/storage/maria/unittest/ma_pagecache_consist.c +++ b/storage/maria/unittest/ma_pagecache_consist.c @@ -273,46 +273,49 @@ void writer(int num) static void *test_thread_reader(void *arg) { int param=*((int*) arg); - DBUG_ENTER("test_reader"); - my_thread_init(); - - DBUG_PRINT("enter", ("param: %d", param)); - - reader(param); - - DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); - pthread_mutex_lock(&LOCK_thread_count); - ok(1, "reader%d: done", param); - thread_count--; - VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */ - pthread_mutex_unlock(&LOCK_thread_count); - free((uchar*) arg); - my_thread_end(); - DBUG_RETURN(0); + { + DBUG_ENTER("test_reader"); + DBUG_PRINT("enter", ("param: %d", param)); + + reader(param); + + DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); + pthread_mutex_lock(&LOCK_thread_count); + ok(1, "reader%d: done", param); + thread_count--; + VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */ + pthread_mutex_unlock(&LOCK_thread_count); + free((uchar*) arg); + my_thread_end(); + } + return 0; } + static void *test_thread_writer(void *arg) { int param=*((int*) arg); - DBUG_ENTER("test_writer"); - my_thread_init(); - DBUG_PRINT("enter", ("param: %d", param)); - - writer(param); - - DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); - pthread_mutex_lock(&LOCK_thread_count); - ok(1, "writer%d: done", param); - thread_count--; - VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */ - pthread_mutex_unlock(&LOCK_thread_count); - free((uchar*) arg); - my_thread_end(); - DBUG_RETURN(0); + { + DBUG_ENTER("test_writer"); + DBUG_PRINT("enter", ("param: %d", param)); + + writer(param); + + DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); + pthread_mutex_lock(&LOCK_thread_count); + ok(1, "writer%d: done", param); + thread_count--; + VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */ + pthread_mutex_unlock(&LOCK_thread_count); + free((uchar*) arg); + my_thread_end(); + } + return 0; } + int main(int argc __attribute__((unused)), char **argv __attribute__((unused))) { diff --git a/storage/maria/unittest/ma_pagecache_rwconsist.c b/storage/maria/unittest/ma_pagecache_rwconsist.c new file mode 100644 index 00000000000..38d2917a2d8 --- /dev/null +++ b/storage/maria/unittest/ma_pagecache_rwconsist.c @@ -0,0 +1,348 @@ +/* + TODO: use pthread_join instead of wait_for_thread_count_to_be_zero, like in + my_atomic-t.c (see BUG#22320). +*/ + +#include <tap.h> +#include <my_sys.h> +#include <m_string.h> +#include "test_file.h" +#include <tap.h> + +#define PCACHE_SIZE (PAGE_SIZE*1024*8) + +#ifndef DBUG_OFF +static const char* default_dbug_option; +#endif + + +#define SLEEP usleep(5) + +static char *file1_name= (char*)"page_cache_test_file_1"; +static PAGECACHE_FILE file1; +static pthread_cond_t COND_thread_count; +static pthread_mutex_t LOCK_thread_count; +static uint thread_count; +static PAGECACHE pagecache; + +static uint number_of_readers= 5; +static uint number_of_writers= 5; +static uint number_of_read_tests= 2000; +static uint number_of_write_tests= 1000; +static uint read_sleep_limit= 3; +static uint report_divisor= 50; + +/** + @brief Dummy pagecache callback. +*/ + +static my_bool +dummy_callback(uchar *page __attribute__((unused)), + pgcache_page_no_t page_no __attribute__((unused)), + uchar* data_ptr __attribute__((unused))) +{ + return 0; +} + + +/** + @brief Dummy pagecache callback. +*/ + +static void +dummy_fail_callback(uchar* data_ptr __attribute__((unused))) +{ + return; +} + + +/** + @brief Checks page consistency + + @param buff pointer to the page content + @param task task ID +*/ +void check_page(uchar *buff, int task) +{ + uint i; + DBUG_ENTER("check_page"); + + for (i= 1; i < PAGE_SIZE; i++) + { + if (buff[0] != buff[i]) + goto err; + } + DBUG_VOID_RETURN; +err: + diag("Task %d char #%u '%u' != '%u'", task, i, (uint) buff[0], + (uint) buff[i]); + DBUG_PRINT("err", ("try to flush")); + exit(1); +} + + + +void reader(int num) +{ + unsigned char *buff; + uint i; + PAGECACHE_BLOCK_LINK *link; + + for (i= 0; i < number_of_read_tests; i++) + { + if (i % report_divisor == 0) + diag("Reader %d - %u", num, i); + buff= pagecache_read(&pagecache, &file1, 0, 3, NULL, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_READ, + &link); + check_page(buff, num); + pagecache_unlock_by_link(&pagecache, link, + PAGECACHE_LOCK_READ_UNLOCK, + PAGECACHE_UNPIN, 0, 0, 0); + { + int lim= rand() % read_sleep_limit; + int j; + for (j= 0; j < lim; j++) + SLEEP; + } + } +} + + +void writer(int num) +{ + uint i; + uchar *buff; + PAGECACHE_BLOCK_LINK *link; + + for (i= 0; i < number_of_write_tests; i++) + { + uchar c= (uchar) rand() % 256; + + if (i % report_divisor == 0) + diag("Writer %d - %u", num, i); + buff= pagecache_read(&pagecache, &file1, 0, 3, NULL, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_WRITE, + &link); + + check_page(buff, num); + bfill(buff, PAGE_SIZE / 2, c); + SLEEP; + bfill(buff + PAGE_SIZE/2, PAGE_SIZE / 2, c); + check_page(buff, num); + pagecache_unlock_by_link(&pagecache, link, + PAGECACHE_LOCK_WRITE_UNLOCK, + PAGECACHE_UNPIN, 0, 0, 1); + SLEEP; + } +} + + +static void *test_thread_reader(void *arg) +{ + int param=*((int*) arg); + my_thread_init(); + { + DBUG_ENTER("test_reader"); + + DBUG_PRINT("enter", ("param: %d", param)); + + reader(param); + + DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); + pthread_mutex_lock(&LOCK_thread_count); + ok(1, "reader%d: done\n", param); + thread_count--; + VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */ + pthread_mutex_unlock(&LOCK_thread_count); + free((uchar*) arg); + my_thread_end(); + } + return 0; +} + + +static void *test_thread_writer(void *arg) +{ + int param=*((int*) arg); + my_thread_init(); + { + DBUG_ENTER("test_writer"); + + writer(param); + + DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); + pthread_mutex_lock(&LOCK_thread_count); + ok(1, "writer%d: done\n", param); + thread_count--; + VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */ + pthread_mutex_unlock(&LOCK_thread_count); + free((uchar*) arg); + my_thread_end(); + } + return 0; +} + + +int main(int argc __attribute__((unused)), + char **argv __attribute__((unused))) +{ + pthread_t tid; + pthread_attr_t thr_attr; + int *param, error, pagen; + + MY_INIT(argv[0]); + +#ifndef DBUG_OFF +#if defined(__WIN__) + default_dbug_option= "d:t:i:O,\\test_pagecache_consist.trace"; +#else + default_dbug_option= "d:t:i:o,/tmp/test_pagecache_consist.trace"; +#endif + if (argc > 1) + { + DBUG_SET(default_dbug_option); + DBUG_SET_INITIAL(default_dbug_option); + } +#endif + + { + DBUG_ENTER("main"); + DBUG_PRINT("info", ("Main thread: %s\n", my_thread_name())); + plan(number_of_writers + number_of_readers); + SKIP_BIG_TESTS(number_of_writers + number_of_readers) + { + + if ((file1.file= my_open(file1_name, + O_CREAT | O_TRUNC | O_RDWR, MYF(0))) == -1) + { + diag( "Got error during file1 creation from open() (errno: %d)\n", + errno); + exit(1); + } + pagecache_file_init(file1, &dummy_callback, &dummy_callback, + &dummy_fail_callback, &dummy_callback, NULL); + DBUG_PRINT("info", ("file1: %d", file1.file)); + if (my_chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO, MYF(MY_WME))) + exit(1); + my_pwrite(file1.file, (const uchar*) "test file", 9, 0, MYF(0)); + + if ((error= pthread_cond_init(&COND_thread_count, NULL))) + { + diag( "COND_thread_count: %d from pthread_cond_init (errno: %d)\n", + error, errno); + exit(1); + } + if ((error= pthread_mutex_init(&LOCK_thread_count, MY_MUTEX_INIT_FAST))) + { + diag( "LOCK_thread_count: %d from pthread_cond_init (errno: %d)\n", + error, errno); + exit(1); + } + + if ((error= pthread_attr_init(&thr_attr))) + { + diag("Got error: %d from pthread_attr_init (errno: %d)\n", + error,errno); + exit(1); + } + if ((error= pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED))) + { + diag( + "Got error: %d from pthread_attr_setdetachstate (errno: %d)\n", + error,errno); + exit(1); + } + +#ifdef HAVE_THR_SETCONCURRENCY + VOID(thr_setconcurrency(2)); +#endif + + if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0, + PAGE_SIZE, 0)) == 0) + { + diag("Got error: init_pagecache() (errno: %d)\n", + errno); + exit(1); + } + DBUG_PRINT("info", ("Page cache %d pages", pagen)); + { + unsigned char *buffr= malloc(PAGE_SIZE); + memset(buffr, '\0', PAGE_SIZE); + pagecache_write(&pagecache, &file1, 0, 3, buffr, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DELAY, + 0, LSN_IMPOSSIBLE); + } + pthread_mutex_lock(&LOCK_thread_count); + + while (number_of_readers != 0 || number_of_writers != 0) + { + if (number_of_readers != 0) + { + param=(int*) malloc(sizeof(int)); + *param= number_of_readers + number_of_writers; + if ((error= pthread_create(&tid, &thr_attr, test_thread_reader, + (void*) param))) + { + diag("Got error: %d from pthread_create (errno: %d)\n", + error,errno); + exit(1); + } + thread_count++; + number_of_readers--; + } + if (number_of_writers != 0) + { + param=(int*) malloc(sizeof(int)); + *param= number_of_writers + number_of_readers; + if ((error= pthread_create(&tid, &thr_attr, test_thread_writer, + (void*) param))) + { + diag("Got error: %d from pthread_create (errno: %d)\n", + error,errno); + exit(1); + } + thread_count++; + number_of_writers--; + } + } + DBUG_PRINT("info", ("Thread started")); + pthread_mutex_unlock(&LOCK_thread_count); + + pthread_attr_destroy(&thr_attr); + + /* wait finishing */ + pthread_mutex_lock(&LOCK_thread_count); + while (thread_count) + { + if ((error= pthread_cond_wait(&COND_thread_count,&LOCK_thread_count))) + diag("COND_thread_count: %d from pthread_cond_wait\n",error); + } + pthread_mutex_unlock(&LOCK_thread_count); + DBUG_PRINT("info", ("thread ended")); + + end_pagecache(&pagecache, 1); + DBUG_PRINT("info", ("Page cache ended")); + + if (my_close(file1.file, MYF(0)) != 0) + { + diag( "Got error during file1 closing from close() (errno: %d)\n", + errno); + exit(1); + } + my_delete(file1_name, MYF(0)); + + DBUG_PRINT("info", ("file1 (%d) closed", file1.file)); + DBUG_PRINT("info", ("Program end")); + + } /* SKIP_BIG_TESTS */ + my_end(0); + + return exit_status(); + } +} |