summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrei Elkin <andrei.elkin@mariadb.com>2018-07-28 12:15:05 +0300
committerAndrei Elkin <andrei.elkin@mariadb.com>2018-07-28 12:15:31 +0300
commitc9c765a6797441a2d24deaf9148309b8503b8e99 (patch)
tree0ae3e598acf252f03b696947fef2a7035b204dab
parent2c14c7d54364bd0b03d7daaf040079b99fc47ab8 (diff)
downloadmariadb-git-bb-10.1_MDEV-14014.tar.gz
MDEV-14014 Unittest extension to cover concurrent IO_CACHE read and write by the dump and user threads.bb-10.1_MDEV-14014
-rw-r--r--unittest/mysys/CMakeLists.txt2
-rw-r--r--unittest/mysys/my_io_cache_conc-t.c196
-rw-r--r--unittest/mysys/thr_template.c57
-rw-r--r--unittest/sql/CMakeLists.txt4
-rw-r--r--unittest/sql/mf_iocache-t.cc46
5 files changed, 301 insertions, 4 deletions
diff --git a/unittest/mysys/CMakeLists.txt b/unittest/mysys/CMakeLists.txt
index ad5195a843e..e72cc36dd23 100644
--- a/unittest/mysys/CMakeLists.txt
+++ b/unittest/mysys/CMakeLists.txt
@@ -14,7 +14,7 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring
- aes
+ aes my_io_cache_conc
LINK_LIBRARIES mysys)
MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys)
diff --git a/unittest/mysys/my_io_cache_conc-t.c b/unittest/mysys/my_io_cache_conc-t.c
new file mode 100644
index 00000000000..0483862b1d2
--- /dev/null
+++ b/unittest/mysys/my_io_cache_conc-t.c
@@ -0,0 +1,196 @@
+/* Copyright (c) 2006, 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#define MY_IO_CACHE_CONC
+int n_writers=2;
+int n_readers=20;
+unsigned long long n_messages=2000;
+int cache_read_with_care=1;
+
+#include "thr_template.c"
+
+
+#define FILL 0x5A
+#define CACHE_SIZE 16384
+//IO_CACHE info;
+#define INFO_TAIL ", pos_in_file = %llu, pos_in_mem = %lu\n", \
+ ptr_log->pos_in_file, (*ptr_log->current_pos - ptr_log->request_pos)
+
+#define BUF_SIZE 2000
+#define HDR_SIZE 8
+
+my_off_t _end_pos;
+uint last_written= 0;
+
+IO_CACHE write_log;
+
+
+void set_end_pos(my_off_t val)
+{
+ // mutext must be hold
+ _end_pos= val;
+ pthread_cond_broadcast(&cond2);
+}
+
+
+pthread_handler_t writer(void *arg)
+{
+ uchar buf[BUF_SIZE];
+ longlong param= *(ulonglong*) arg;
+ IO_CACHE *ptr_log= &write_log;
+
+ my_thread_init();
+
+ memset(buf, FILL, sizeof(buf));
+
+ diag("MDEV-14014 Dump thread reads past last 'officially' written byte");
+
+ for (; param > 0; param--)
+ {
+ int res;
+ // Generate a message of arb size that has at least 1 byte of payload
+ uint32 size= rand() % (BUF_SIZE - HDR_SIZE - 1) + HDR_SIZE + 1;
+ int4store(buf, size );
+ // Lock
+ pthread_mutex_lock(&mutex);
+ int4store(buf + 4, ++last_written);
+ res= my_b_write(ptr_log, buf, size);
+ //ok(res == 0, "buffer is written" INFO_TAIL);
+ res= my_b_flush_io_cache(ptr_log, 1);
+ set_end_pos(my_b_write_tell(ptr_log));
+ pthread_mutex_unlock(&mutex);
+ // Unlock
+ //ok(res == 0, "flush" INFO_TAIL);
+ }
+
+ pthread_mutex_lock(&mutex);
+ if (!--running_threads)
+ pthread_cond_signal(&cond);
+ pthread_mutex_unlock(&mutex);
+
+ my_thread_end();
+ return 0;
+}
+
+my_off_t get_end_pos()
+{
+ my_off_t ret;
+ pthread_mutex_lock(&mutex);
+ ret= _end_pos;
+ pthread_mutex_unlock(&mutex);
+
+ return ret;
+}
+
+my_off_t wait_new_events()
+{
+ my_off_t ret;
+
+ pthread_mutex_lock(&mutex);
+ pthread_cond_wait(&cond2, &mutex);
+ ret= _end_pos;
+ pthread_mutex_unlock(&mutex);
+
+ return ret;
+}
+
+pthread_handler_t reader(void *arg)
+{
+ int res;
+ uchar buf[BUF_SIZE];
+ File file= -1;
+ const char *log_file_name="my.log";
+ IO_CACHE read_log;
+ IO_CACHE *ptr_log= &read_log;
+ longlong n_messages= (*(longlong*) arg) * n_writers;
+ my_off_t log_pos;
+ //uint last_read= 0;
+
+ my_thread_init();
+
+ memset(buf, 0, sizeof( buf));
+
+ diag("MDEV-14014 Dump thread reads past last 'officially' written byte");
+
+ file= my_open(//key_file_binlog,
+ log_file_name, O_CREAT | O_RDONLY | O_BINARY | O_SHARE,
+ MYF(MY_WME));
+ //ok(file >= 0, "mysql_file_open\n");
+ res= init_io_cache(ptr_log, file, IO_SIZE*2, READ_CACHE, 0, 0,
+ MYF(MY_WME|MY_DONT_CHECK_FILESIZE));
+ //ok(res == 0, "init_io_cache");
+
+ log_pos= my_b_tell(ptr_log);
+ for (; n_messages > 0;)
+ {
+ my_off_t end_pos= get_end_pos();
+ size_t size;
+
+ if (log_pos >= end_pos)
+ end_pos= wait_new_events();
+
+ if (cache_read_with_care)
+ ptr_log->end_of_file= end_pos;
+
+ while (log_pos < end_pos)
+ {
+ // Read a message in two steps
+ res= my_b_read(ptr_log, buf, HDR_SIZE);
+ //ok(res == 0, "my_b_read HDR_SIZE");
+ size= uint4korr(buf);
+ ok(size >= HDR_SIZE && size <= BUF_SIZE, "msg size within HDR_SIZE, BUF_SIZE\n");
+ //ok(uint4korr(buf+4) == ++last_read, "current msg number succeeds the last one\n");
+ res= my_b_read(ptr_log, buf + HDR_SIZE, size - HDR_SIZE);
+ //ok(res == 0, "my_b_read payload");
+ ok(res == 0 && buf[HDR_SIZE] == FILL && buf[size - 1] == FILL, "my_b_read sane");
+
+ n_messages--;
+ //ok(n_messages >= 0, "param is not negative");
+ log_pos= my_b_tell(ptr_log);
+ }
+ }
+ //my_sleep(1000000);
+ close_cached_file(ptr_log);
+
+ pthread_mutex_lock(&mutex);
+ if (!--running_threads)
+ pthread_cond_signal(&cond);
+ pthread_mutex_unlock(&mutex);
+
+ my_thread_end();
+ return 0;
+}
+
+
+void do_tests()
+{
+ const char *log_file_name="my.log";
+ File file= my_open(//key_file_binlog,
+ log_file_name, O_CREAT | O_RDWR | O_BINARY | O_SHARE,
+ MYF(MY_WME));
+ int res;
+ IO_CACHE *ptr_log= &write_log;
+
+ ok(file >= 0, "mysql_file_open\n");
+ res= init_io_cache(ptr_log, file, IO_SIZE*2, WRITE_CACHE, 0, 0,
+ MYF(MY_WME|MY_DONT_CHECK_FILESIZE));
+ ok(res == 0, "init_io_cache");
+
+ test_concurrently2("my_io_cache_conc", writer, reader,
+ n_writers, n_readers, n_messages);
+ //my_sync(ptr_log->file, MYF(MY_WME|MY_SYNC_FILESIZE));
+ close_cached_file(ptr_log);
+
+}
diff --git a/unittest/mysys/thr_template.c b/unittest/mysys/thr_template.c
index 38999022da0..7cdd4f2043a 100644
--- a/unittest/mysys/thr_template.c
+++ b/unittest/mysys/thr_template.c
@@ -23,6 +23,7 @@ volatile uint32 bad;
pthread_attr_t thr_attr;
pthread_mutex_t mutex;
pthread_cond_t cond;
+pthread_cond_t cond2;
uint running_threads;
void do_tests();
@@ -52,15 +53,66 @@ void test_concurrently(const char *test, pthread_handler handler, int n, int m)
ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad);
}
+void test_concurrently2(const char *test, pthread_handler handler1,
+ pthread_handler handler2, int n1, int n2, ulonglong param_h)
+{
+ pthread_t t;
+ ulonglong now= my_interval_timer();
+
+ bad= 0;
+
+ diag("Testing %s with %d+%d threads... ", test, n1, n2);
+ running_threads= n1 + n2;
+ for (; n1 ; n1--)
+ {
+ if (pthread_create(&t, &thr_attr, handler1, &param_h) != 0)
+ {
+ diag("Could not create thread");
+ abort();
+ }
+ }
+ for (; n2 ; n2--)
+ {
+ if (pthread_create(&t, &thr_attr, handler2, &param_h) != 0)
+ {
+ diag("Could not create thread");
+ abort();
+ }
+ }
+ pthread_mutex_lock(&mutex);
+ while (running_threads)
+ pthread_cond_wait(&cond, &mutex);
+ pthread_mutex_unlock(&mutex);
+
+ now= my_interval_timer() - now;
+ ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad);
+}
+
+#ifdef MY_IO_CACHE_CONC
+int main(int argc, char **argv)
+#else
int main(int argc __attribute__((unused)), char **argv)
+#endif
{
MY_INIT(argv[0]);
+#ifdef MY_IO_CACHE_CONC
+ if (argc > 1)
+ n_readers= atoi(argv[1]);
+ if (argc > 2)
+ n_writers= atoi(argv[2]);
+ if (argc > 3)
+ n_messages= atoi(argv[3]);
+ if (argc > 4)
+ cache_read_with_care= atoi(argv[4]);
+#else
if (argv[1] && *argv[1])
- DBUG_SET_INITIAL(argv[1]);
+ DBUG_SET_INITIAL(argv[1]);
+#endif
pthread_mutex_init(&mutex, 0);
- pthread_cond_init(&cond, 0);
+ pthread_cond_init(&cond, 0);
+ pthread_cond_init(&cond2, 0);
pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
@@ -80,6 +132,7 @@ int main(int argc __attribute__((unused)), char **argv)
#endif
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
+ pthread_cond_destroy(&cond2);
pthread_attr_destroy(&thr_attr);
my_end(0);
return exit_status();
diff --git a/unittest/sql/CMakeLists.txt b/unittest/sql/CMakeLists.txt
index cd2ba9b3d2f..6eab8b65ae1 100644
--- a/unittest/sql/CMakeLists.txt
+++ b/unittest/sql/CMakeLists.txt
@@ -34,3 +34,7 @@ ADD_EXECUTABLE(mf_iocache-t mf_iocache-t.cc ../../sql/mf_iocache_encr.cc)
TARGET_LINK_LIBRARIES(mf_iocache-t mysys mytap)
ADD_DEPENDENCIES(mf_iocache-t GenError)
MY_ADD_TEST(mf_iocache)
+
+# ADD_EXECUTABLE(my_io_cache_conc-t my_io_cache_conc-t.cc)
+# TARGET_LINK_LIBRARIES(my_io_cache_conc-t mysys)
+# MY_ADD_TEST(my_io_cache_conc)
diff --git a/unittest/sql/mf_iocache-t.cc b/unittest/sql/mf_iocache-t.cc
index 2cd5b678700..e186b9c9207 100644
--- a/unittest/sql/mf_iocache-t.cc
+++ b/unittest/sql/mf_iocache-t.cc
@@ -16,6 +16,7 @@
#include <my_sys.h>
#include <my_crypt.h>
#include <tap.h>
+#include <mysql/psi/mysql_file.h>
/*** tweaks and stubs for encryption code to compile ***************/
#define KEY_SIZE (128/8)
@@ -285,11 +286,53 @@ void mdev14014()
close_cached_file(&info);
}
+void mdev14014_2()
+{
+ int res;
+ uchar buf_o[200];
+ uchar buf_i[200];
+ File file= -1;
+ const char *log_file_name="my.log";
+ IO_CACHE log;
+
+ memset(buf_i, 0, sizeof( buf_i));
+ memset(buf_o, FILL, sizeof(buf_o));
+
+ diag("MDEV-14014 Dump thread reads past last 'officially' written byte");
+
+ init_io_cache_encryption();
+ file= my_open(//key_file_binlog,
+ log_file_name, O_CREAT | O_RDWR | O_BINARY | O_SHARE,
+ MYF(MY_WME));
+ ok(file >= 0, "mysql_file_open");
+ res= init_io_cache(&log, file, IO_SIZE*2, WRITE_CACHE, 0, 0,
+ MYF(MY_WME|MY_DONT_CHECK_FILESIZE));
+ ok(res == 0, "init_io_cache");
+ //res= open_cached_file(&info, 0, 0, CACHE_SIZE, 0);
+ //ok(res == 0, "open_cached_file" INFO_TAIL);
+
+ res= my_b_write(&log, buf_o, sizeof(buf_o));
+ ok(res == 0, "buffer is written" INFO_TAIL);
+
+ res= my_b_flush_io_cache(&log, 1);
+ ok(res == 0, "flush" INFO_TAIL);
+
+ res= reinit_io_cache(&log, READ_CACHE, 0, 0, 0);
+ ok(res == 0, "reinit READ_CACHE" INFO_TAIL);
+
+ log.end_of_file= 100;
+ res= my_b_read(&log, buf_i, sizeof(buf_i));
+ ok(res == 1 && buf_i[100] == 0 && buf_i[200-1] == 0,
+ "short read leaves buf_i[100..200-1] == 0");
+
+ close_cached_file(&log);
+}
+
int main(int argc __attribute__((unused)),char *argv[])
{
MY_INIT(argv[0]);
- plan(51);
+ plan(57);
/* temp files with and without encryption */
encrypt_tmp_files= 1;
@@ -306,6 +349,7 @@ int main(int argc __attribute__((unused)),char *argv[])
encrypt_tmp_files= 0;
mdev14014();
+ mdev14014_2();
my_end(0);
return exit_status();