summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <brian@zim.tangent.org>2006-02-13 00:45:59 -0800
committerunknown <brian@zim.tangent.org>2006-02-13 00:45:59 -0800
commitb8d179f3e00d9e72a3391c9f69d324fa08b5a1f5 (patch)
treea1e383dff70acb11f347b245f59fabed944ac3ce
parent6bfbadc8ee1d16b0a4efab838b44f88f1d5b032a (diff)
downloadmariadb-git-b8d179f3e00d9e72a3391c9f69d324fa08b5a1f5.tar.gz
Adding thread support for mysqlimport. You can now specify a number of threads to use and it will thread the loading of the database. Anyone who has had to go through the pain of loading the database will immediatly get the reason for this.
client/Makefile.am: Adding client_thread_libs for mysqlimport (aka it gets pthreads) client/client_priv.h: New option client/mysqlimport.c: Reworked logic to allow someone to use threads. mysql-test/r/mysqldump.result: New results mysql-test/t/mysqldump.test: Added tests for threads.
-rw-r--r--client/Makefile.am1
-rw-r--r--client/client_priv.h1
-rw-r--r--client/mysqlimport.c119
-rw-r--r--mysql-test/r/mysqldump.result32
-rw-r--r--mysql-test/t/mysqldump.test24
5 files changed, 160 insertions, 17 deletions
diff --git a/client/Makefile.am b/client/Makefile.am
index 849bd37eb57..6a851cf2f2c 100644
--- a/client/Makefile.am
+++ b/client/Makefile.am
@@ -49,6 +49,7 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc $(top_srcdir)/mysys/mf_tempdir.c \
$(top_srcdir)/mysys/base64.c
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysqlslap_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
+mysqlimport_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
mysqltestmanager_pwgen_SOURCES = mysqlmanager-pwgen.c
mysqltestmanagerc_SOURCES= mysqlmanagerc.c $(yassl_dummy_link_fix)
mysqlcheck_SOURCES= mysqlcheck.c $(yassl_dummy_link_fix)
diff --git a/client/client_priv.h b/client/client_priv.h
index 44f3ce227af..2763dabb027 100644
--- a/client/client_priv.h
+++ b/client/client_priv.h
@@ -54,6 +54,7 @@ enum options_client
OPT_MYSQL_LOCK_DIRECTORY,
OPT_MYSQL_SLAP_SLAVE,
OPT_USE_THREADS,
+ OPT_IMPORT_USE_THREADS,
OPT_MYSQL_NUMBER_OF_QUERY,
OPT_MYSQL_PRESERVE_SCHEMA,
OPT_IGNORE_TABLE,OPT_INSERT_IGNORE,OPT_SHOW_WARNINGS,OPT_DROP_DATABASE,
diff --git a/client/mysqlimport.c b/client/mysqlimport.c
index 8694093f06b..946ea645230 100644
--- a/client/mysqlimport.c
+++ b/client/mysqlimport.c
@@ -29,6 +29,10 @@
#include "client_priv.h"
#include "mysql_version.h"
+#include <my_pthread.h>
+
+/* Global Thread counter */
+int counter= 0;
static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql);
@@ -39,6 +43,7 @@ static char *add_load_option(char *ptr,const char *object,
static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0,
replace=0,silent=0,ignore=0,opt_compress=0,
opt_low_priority= 0, tty_password= 0;
+static my_bool opt_use_threads= 0;
static uint opt_local_file=0;
static MYSQL mysql_connection;
static char *opt_password=0, *current_user=0,
@@ -138,6 +143,11 @@ static struct my_option my_long_options[] =
(gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#include <sslopt-longopts.h>
+ {"use-threads", OPT_USE_THREADS,
+ "Parrelize the loading of files. Requires an arguement for the number \
+ threads to use for loading of data.",
+ (gptr*) &opt_use_threads, (gptr*) &opt_use_threads, 0,
+ GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DONT_ALLOW_USER_CHANGE
{"user", 'u', "User for login if not current user.", (gptr*) &current_user,
(gptr*) &current_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
@@ -287,7 +297,7 @@ static int write_to_table(char *filename, MYSQL *sock)
{
if (verbose)
fprintf(stdout, "Deleting the old data from table %s\n", tablename);
- sprintf(sql_statement, "DELETE FROM %s", tablename);
+ snprintf(sql_statement, FN_REFLEN*16+256, "DELETE FROM %s", tablename);
if (mysql_query(sock, sql_statement))
{
db_error_with_table(sock, tablename);
@@ -497,12 +507,39 @@ static char *field_escape(char *to,const char *from,uint length)
}
+int
+worker_thread(char *raw_table_name)
+{
+ MYSQL *sock= 0;
+ if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
+ {
+ goto error;
+ }
+
+ if (mysql_query(sock, "set @@character_set_database=binary;"))
+ {
+ db_error(sock); /* We shall countinue here, if --force was given */
+ goto error;
+ }
+
+ /*
+ We should do something about the error
+ */
+ write_to_table(raw_table_name, sock);
+
+error:
+ if (sock)
+ db_disconnect(current_host, sock);
+
+ counter--;
+ return 0;
+}
+
int main(int argc, char **argv)
{
int exitcode=0, error=0;
char **argv_to_free;
- MYSQL *sock=0;
MY_INIT(argv[0]);
load_defaults("my",load_default_groups,&argc,&argv);
@@ -513,25 +550,73 @@ int main(int argc, char **argv)
free_defaults(argv_to_free);
return(1);
}
- if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
- {
- free_defaults(argv_to_free);
- return(1); /* purecov: deadcode */
- }
- if (mysql_query(sock, "set @@character_set_database=binary;"))
+ if (opt_use_threads)
{
- db_error(sock); /* We shall countinue here, if --force was given */
- return(1);
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+
+ for (; *argv != NULL; argv++) // Loop through tables
+ {
+ /*
+ If we hit thread count limit we loop until some threads exit.
+ We sleep for a second, so that we don't chew up a lot of
+ CPU in the loop.
+ */
+sanity_label:
+ if (counter == opt_use_threads)
+ {
+ sleep(1);
+ goto sanity_label;
+ }
+ counter++;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ /* now create the thread */
+ if (pthread_create(&mainthread, &attr, (void *)worker_thread,
+ (void *)*argv) != 0)
+ {
+ counter--;
+ fprintf(stderr,"%s: Could not create thread\n",
+ my_progname);
+ }
+ }
+
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+loop_label:
+ if (counter)
+ {
+ sleep(1);
+ goto loop_label;
+ }
}
+ else
+ {
+ MYSQL *sock= 0;
+ if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
+ {
+ free_defaults(argv_to_free);
+ return(1); /* purecov: deadcode */
+ }
- if (lock_tables)
- lock_table(sock, argc, argv);
- for (; *argv != NULL; argv++)
- if ((error=write_to_table(*argv, sock)))
- if (exitcode == 0)
- exitcode = error;
- db_disconnect(current_host, sock);
+ if (mysql_query(sock, "set @@character_set_database=binary;"))
+ {
+ db_error(sock); /* We shall countinue here, if --force was given */
+ return(1);
+ }
+
+ if (lock_tables)
+ lock_table(sock, argc, argv);
+ for (; *argv != NULL; argv++)
+ if ((error=write_to_table(*argv, sock)))
+ if (exitcode == 0)
+ exitcode = error;
+ db_disconnect(current_host, sock);
+ }
my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR));
#ifdef HAVE_SMEM
my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR));
diff --git a/mysql-test/r/mysqldump.result b/mysql-test/r/mysqldump.result
index 79b22964f8a..7328a7da2eb 100644
--- a/mysql-test/r/mysqldump.result
+++ b/mysql-test/r/mysqldump.result
@@ -2650,3 +2650,35 @@ DELIMITER ;
DROP TRIGGER tr1;
DROP TABLE t1;
+create table t1 (a text , b text);
+create table t2 (a text , b text);
+insert t1 values ("Duck, Duck", "goose");
+insert t1 values ("Duck, Duck", "pidgeon");
+insert t2 values ("We the people", "in order to perform");
+insert t2 values ("a more perfect", "union");
+select * from t1;
+a b
+Duck, Duck goose
+Duck, Duck pidgeon
+select * from t2;
+a b
+We the people in order to perform
+a more perfect union
+test.t1: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+test.t2: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+test.t1: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+test.t2: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+select * from t1;
+a b
+Duck, Duck goose
+Duck, Duck pidgeon
+Duck, Duck goose
+Duck, Duck pidgeon
+select * from t2;
+a b
+We the people in order to perform
+a more perfect union
+We the people in order to perform
+a more perfect union
+drop table t1;
+drop table t2;
diff --git a/mysql-test/t/mysqldump.test b/mysql-test/t/mysqldump.test
index 592bc289d05..dea8e32869e 100644
--- a/mysql-test/t/mysqldump.test
+++ b/mysql-test/t/mysqldump.test
@@ -1048,3 +1048,27 @@ SET SQL_MODE = @old_sql_mode;
DROP TRIGGER tr1;
DROP TABLE t1;
+
+#
+# Added for use-thread option
+#
+create table t1 (a text , b text);
+create table t2 (a text , b text);
+insert t1 values ("Duck, Duck", "goose");
+insert t1 values ("Duck, Duck", "pidgeon");
+insert t2 values ("We the people", "in order to perform");
+insert t2 values ("a more perfect", "union");
+select * from t1;
+select * from t2;
+--exec $MYSQL_DUMP --tab=$MYSQLTEST_VARDIR/tmp/ test
+--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t1.sql
+--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t2.sql
+# The first load tests the pausing code
+--exec $MYSQL_IMPORT --use-threads=1 test $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
+# Now we test with multiple threads!
+--exec $MYSQL_IMPORT --use-threads=5 test $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
+select * from t1;
+select * from t2;
+
+drop table t1;
+drop table t2;