diff options
author | Andrei Elkin <andrei.elkin@mariadb.com> | 2018-07-28 12:15:05 +0300 |
---|---|---|
committer | Andrei Elkin <andrei.elkin@mariadb.com> | 2018-07-28 12:15:31 +0300 |
commit | c9c765a6797441a2d24deaf9148309b8503b8e99 (patch) | |
tree | 0ae3e598acf252f03b696947fef2a7035b204dab | |
parent | 2c14c7d54364bd0b03d7daaf040079b99fc47ab8 (diff) | |
download | mariadb-git-bb-10.1_MDEV-14014.tar.gz |
MDEV-14014 Unittest extension to cover concurrent IO_CACHE read and write by the dump and user threads.bb-10.1_MDEV-14014
-rw-r--r-- | unittest/mysys/CMakeLists.txt | 2 | ||||
-rw-r--r-- | unittest/mysys/my_io_cache_conc-t.c | 196 | ||||
-rw-r--r-- | unittest/mysys/thr_template.c | 57 | ||||
-rw-r--r-- | unittest/sql/CMakeLists.txt | 4 | ||||
-rw-r--r-- | unittest/sql/mf_iocache-t.cc | 46 |
5 files changed, 301 insertions, 4 deletions
diff --git a/unittest/mysys/CMakeLists.txt b/unittest/mysys/CMakeLists.txt index ad5195a843e..e72cc36dd23 100644 --- a/unittest/mysys/CMakeLists.txt +++ b/unittest/mysys/CMakeLists.txt @@ -14,7 +14,7 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring - aes + aes my_io_cache_conc LINK_LIBRARIES mysys) MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys) diff --git a/unittest/mysys/my_io_cache_conc-t.c b/unittest/mysys/my_io_cache_conc-t.c new file mode 100644 index 00000000000..0483862b1d2 --- /dev/null +++ b/unittest/mysys/my_io_cache_conc-t.c @@ -0,0 +1,196 @@ +/* Copyright (c) 2006, 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#define MY_IO_CACHE_CONC +int n_writers=2; +int n_readers=20; +unsigned long long n_messages=2000; +int cache_read_with_care=1; + +#include "thr_template.c" + + +#define FILL 0x5A +#define CACHE_SIZE 16384 +//IO_CACHE info; +#define INFO_TAIL ", pos_in_file = %llu, pos_in_mem = %lu\n", \ + ptr_log->pos_in_file, (*ptr_log->current_pos - ptr_log->request_pos) + +#define BUF_SIZE 2000 +#define HDR_SIZE 8 + +my_off_t _end_pos; +uint last_written= 0; + +IO_CACHE write_log; + + +void set_end_pos(my_off_t val) +{ + // mutext must be hold + _end_pos= val; + pthread_cond_broadcast(&cond2); +} + + +pthread_handler_t writer(void *arg) +{ + uchar buf[BUF_SIZE]; + longlong param= *(ulonglong*) arg; + IO_CACHE *ptr_log= &write_log; + + my_thread_init(); + + memset(buf, FILL, sizeof(buf)); + + diag("MDEV-14014 Dump thread reads past last 'officially' written byte"); + + for (; param > 0; param--) + { + int res; + // Generate a message of arb size that has at least 1 byte of payload + uint32 size= rand() % (BUF_SIZE - HDR_SIZE - 1) + HDR_SIZE + 1; + int4store(buf, size ); + // Lock + pthread_mutex_lock(&mutex); + int4store(buf + 4, ++last_written); + res= my_b_write(ptr_log, buf, size); + //ok(res == 0, "buffer is written" INFO_TAIL); + res= my_b_flush_io_cache(ptr_log, 1); + set_end_pos(my_b_write_tell(ptr_log)); + pthread_mutex_unlock(&mutex); + // Unlock + //ok(res == 0, "flush" INFO_TAIL); + } + + pthread_mutex_lock(&mutex); + if (!--running_threads) + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + + my_thread_end(); + return 0; +} + +my_off_t get_end_pos() +{ + my_off_t ret; + pthread_mutex_lock(&mutex); + ret= _end_pos; + pthread_mutex_unlock(&mutex); + + return ret; +} + +my_off_t wait_new_events() +{ + my_off_t ret; + + pthread_mutex_lock(&mutex); + pthread_cond_wait(&cond2, &mutex); + ret= _end_pos; + pthread_mutex_unlock(&mutex); + + return ret; +} + +pthread_handler_t reader(void *arg) +{ + int res; + uchar buf[BUF_SIZE]; + File file= -1; + const char *log_file_name="my.log"; + IO_CACHE read_log; + IO_CACHE *ptr_log= &read_log; + longlong n_messages= (*(longlong*) arg) * n_writers; + my_off_t log_pos; + //uint last_read= 0; + + my_thread_init(); + + memset(buf, 0, sizeof( buf)); + + diag("MDEV-14014 Dump thread reads past last 'officially' written byte"); + + file= my_open(//key_file_binlog, + log_file_name, O_CREAT | O_RDONLY | O_BINARY | O_SHARE, + MYF(MY_WME)); + //ok(file >= 0, "mysql_file_open\n"); + res= init_io_cache(ptr_log, file, IO_SIZE*2, READ_CACHE, 0, 0, + MYF(MY_WME|MY_DONT_CHECK_FILESIZE)); + //ok(res == 0, "init_io_cache"); + + log_pos= my_b_tell(ptr_log); + for (; n_messages > 0;) + { + my_off_t end_pos= get_end_pos(); + size_t size; + + if (log_pos >= end_pos) + end_pos= wait_new_events(); + + if (cache_read_with_care) + ptr_log->end_of_file= end_pos; + + while (log_pos < end_pos) + { + // Read a message in two steps + res= my_b_read(ptr_log, buf, HDR_SIZE); + //ok(res == 0, "my_b_read HDR_SIZE"); + size= uint4korr(buf); + ok(size >= HDR_SIZE && size <= BUF_SIZE, "msg size within HDR_SIZE, BUF_SIZE\n"); + //ok(uint4korr(buf+4) == ++last_read, "current msg number succeeds the last one\n"); + res= my_b_read(ptr_log, buf + HDR_SIZE, size - HDR_SIZE); + //ok(res == 0, "my_b_read payload"); + ok(res == 0 && buf[HDR_SIZE] == FILL && buf[size - 1] == FILL, "my_b_read sane"); + + n_messages--; + //ok(n_messages >= 0, "param is not negative"); + log_pos= my_b_tell(ptr_log); + } + } + //my_sleep(1000000); + close_cached_file(ptr_log); + + pthread_mutex_lock(&mutex); + if (!--running_threads) + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + + my_thread_end(); + return 0; +} + + +void do_tests() +{ + const char *log_file_name="my.log"; + File file= my_open(//key_file_binlog, + log_file_name, O_CREAT | O_RDWR | O_BINARY | O_SHARE, + MYF(MY_WME)); + int res; + IO_CACHE *ptr_log= &write_log; + + ok(file >= 0, "mysql_file_open\n"); + res= init_io_cache(ptr_log, file, IO_SIZE*2, WRITE_CACHE, 0, 0, + MYF(MY_WME|MY_DONT_CHECK_FILESIZE)); + ok(res == 0, "init_io_cache"); + + test_concurrently2("my_io_cache_conc", writer, reader, + n_writers, n_readers, n_messages); + //my_sync(ptr_log->file, MYF(MY_WME|MY_SYNC_FILESIZE)); + close_cached_file(ptr_log); + +} diff --git a/unittest/mysys/thr_template.c b/unittest/mysys/thr_template.c index 38999022da0..7cdd4f2043a 100644 --- a/unittest/mysys/thr_template.c +++ b/unittest/mysys/thr_template.c @@ -23,6 +23,7 @@ volatile uint32 bad; pthread_attr_t thr_attr; pthread_mutex_t mutex; pthread_cond_t cond; +pthread_cond_t cond2; uint running_threads; void do_tests(); @@ -52,15 +53,66 @@ void test_concurrently(const char *test, pthread_handler handler, int n, int m) ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad); } +void test_concurrently2(const char *test, pthread_handler handler1, + pthread_handler handler2, int n1, int n2, ulonglong param_h) +{ + pthread_t t; + ulonglong now= my_interval_timer(); + + bad= 0; + + diag("Testing %s with %d+%d threads... ", test, n1, n2); + running_threads= n1 + n2; + for (; n1 ; n1--) + { + if (pthread_create(&t, &thr_attr, handler1, ¶m_h) != 0) + { + diag("Could not create thread"); + abort(); + } + } + for (; n2 ; n2--) + { + if (pthread_create(&t, &thr_attr, handler2, ¶m_h) != 0) + { + diag("Could not create thread"); + abort(); + } + } + pthread_mutex_lock(&mutex); + while (running_threads) + pthread_cond_wait(&cond, &mutex); + pthread_mutex_unlock(&mutex); + + now= my_interval_timer() - now; + ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad); +} + +#ifdef MY_IO_CACHE_CONC +int main(int argc, char **argv) +#else int main(int argc __attribute__((unused)), char **argv) +#endif { MY_INIT(argv[0]); +#ifdef MY_IO_CACHE_CONC + if (argc > 1) + n_readers= atoi(argv[1]); + if (argc > 2) + n_writers= atoi(argv[2]); + if (argc > 3) + n_messages= atoi(argv[3]); + if (argc > 4) + cache_read_with_care= atoi(argv[4]); +#else if (argv[1] && *argv[1]) - DBUG_SET_INITIAL(argv[1]); + DBUG_SET_INITIAL(argv[1]); +#endif pthread_mutex_init(&mutex, 0); - pthread_cond_init(&cond, 0); + pthread_cond_init(&cond, 0); + pthread_cond_init(&cond2, 0); pthread_attr_init(&thr_attr); pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); @@ -80,6 +132,7 @@ int main(int argc __attribute__((unused)), char **argv) #endif pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); + pthread_cond_destroy(&cond2); pthread_attr_destroy(&thr_attr); my_end(0); return exit_status(); diff --git a/unittest/sql/CMakeLists.txt b/unittest/sql/CMakeLists.txt index cd2ba9b3d2f..6eab8b65ae1 100644 --- a/unittest/sql/CMakeLists.txt +++ b/unittest/sql/CMakeLists.txt @@ -34,3 +34,7 @@ ADD_EXECUTABLE(mf_iocache-t mf_iocache-t.cc ../../sql/mf_iocache_encr.cc) TARGET_LINK_LIBRARIES(mf_iocache-t mysys mytap) ADD_DEPENDENCIES(mf_iocache-t GenError) MY_ADD_TEST(mf_iocache) + +# ADD_EXECUTABLE(my_io_cache_conc-t my_io_cache_conc-t.cc) +# TARGET_LINK_LIBRARIES(my_io_cache_conc-t mysys) +# MY_ADD_TEST(my_io_cache_conc) diff --git a/unittest/sql/mf_iocache-t.cc b/unittest/sql/mf_iocache-t.cc index 2cd5b678700..e186b9c9207 100644 --- a/unittest/sql/mf_iocache-t.cc +++ b/unittest/sql/mf_iocache-t.cc @@ -16,6 +16,7 @@ #include <my_sys.h> #include <my_crypt.h> #include <tap.h> +#include <mysql/psi/mysql_file.h> /*** tweaks and stubs for encryption code to compile ***************/ #define KEY_SIZE (128/8) @@ -285,11 +286,53 @@ void mdev14014() close_cached_file(&info); } +void mdev14014_2() +{ + int res; + uchar buf_o[200]; + uchar buf_i[200]; + File file= -1; + const char *log_file_name="my.log"; + IO_CACHE log; + + memset(buf_i, 0, sizeof( buf_i)); + memset(buf_o, FILL, sizeof(buf_o)); + + diag("MDEV-14014 Dump thread reads past last 'officially' written byte"); + + init_io_cache_encryption(); + file= my_open(//key_file_binlog, + log_file_name, O_CREAT | O_RDWR | O_BINARY | O_SHARE, + MYF(MY_WME)); + ok(file >= 0, "mysql_file_open"); + res= init_io_cache(&log, file, IO_SIZE*2, WRITE_CACHE, 0, 0, + MYF(MY_WME|MY_DONT_CHECK_FILESIZE)); + ok(res == 0, "init_io_cache"); + //res= open_cached_file(&info, 0, 0, CACHE_SIZE, 0); + //ok(res == 0, "open_cached_file" INFO_TAIL); + + res= my_b_write(&log, buf_o, sizeof(buf_o)); + ok(res == 0, "buffer is written" INFO_TAIL); + + res= my_b_flush_io_cache(&log, 1); + ok(res == 0, "flush" INFO_TAIL); + + res= reinit_io_cache(&log, READ_CACHE, 0, 0, 0); + ok(res == 0, "reinit READ_CACHE" INFO_TAIL); + + log.end_of_file= 100; + res= my_b_read(&log, buf_i, sizeof(buf_i)); + ok(res == 1 && buf_i[100] == 0 && buf_i[200-1] == 0, + "short read leaves buf_i[100..200-1] == 0"); + + close_cached_file(&log); +} + int main(int argc __attribute__((unused)),char *argv[]) { MY_INIT(argv[0]); - plan(51); + plan(57); /* temp files with and without encryption */ encrypt_tmp_files= 1; @@ -306,6 +349,7 @@ int main(int argc __attribute__((unused)),char *argv[]) encrypt_tmp_files= 0; mdev14014(); + mdev14014_2(); my_end(0); return exit_status(); |