diff options
Diffstat (limited to 'extra/mariabackup/xbstream.c')
-rw-r--r-- | extra/mariabackup/xbstream.c | 257 |
1 files changed, 207 insertions, 50 deletions
diff --git a/extra/mariabackup/xbstream.c b/extra/mariabackup/xbstream.c index 9990b00ea4b..2cc47ec7273 100644 --- a/extra/mariabackup/xbstream.c +++ b/extra/mariabackup/xbstream.c @@ -22,10 +22,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA #include <my_base.h> #include <my_getopt.h> #include <hash.h> +#include <my_pthread.h> #include "common.h" #include "xbstream.h" -#include "ds_local.h" -#include "ds_stdout.h" +#include "xbcrypt_common.h" +#include "datasink.h" +#include "ds_decrypt.h" +#include "crc_glue.h" #define XBSTREAM_VERSION "1.0" #define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL) @@ -38,6 +41,12 @@ typedef enum { RUN_MODE_EXTRACT } run_mode_t; +const char *xbstream_encrypt_algo_names[] = +{ "NONE", "AES128", "AES192", "AES256", NullS}; +TYPELIB xbstream_encrypt_algo_typelib= +{array_elements(xbstream_encrypt_algo_names)-1,"", + xbstream_encrypt_algo_names, NULL}; + /* Need the following definitions to avoid linking with ds_*.o and their link dependencies */ datasink_t datasink_archive; @@ -47,9 +56,18 @@ datasink_t datasink_tmpfile; datasink_t datasink_encrypt; datasink_t datasink_buffer; -static run_mode_t opt_mode; +static run_mode_t opt_mode; static char * opt_directory = NULL; static my_bool opt_verbose = 0; +static int opt_parallel = 1; +static ulong opt_encrypt_algo; +static char *opt_encrypt_key_file = NULL; +static void *opt_encrypt_key = NULL; +static int opt_encrypt_threads = 1; + +enum { + OPT_ENCRYPT_THREADS = 256 +}; static struct my_option my_long_options[] = { @@ -65,21 +83,46 @@ static struct my_option my_long_options[] = GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"parallel", 'p', "Number of worker threads for reading / writing.", + &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG, + 1, 1, INT_MAX, 0, 0, 0}, + {"decrypt", 'd', "Decrypt files ending with .xbcrypt.", + &opt_encrypt_algo, &opt_encrypt_algo, &xbstream_encrypt_algo_typelib, + GET_ENUM, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"encrypt-key", 'k', "Encryption key.", + &opt_encrypt_key, &opt_encrypt_key, 0, + GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"encrypt-key-file", 'f', "File which contains encryption key.", + &opt_encrypt_key_file, &opt_encrypt_key_file, 0, + GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"encrypt-threads", OPT_ENCRYPT_THREADS, + "Number of threads for parallel data encryption. " + "The default value is 1.", + &opt_encrypt_threads, &opt_encrypt_threads, + 0, GET_INT, REQUIRED_ARG, 1, 1, INT_MAX, 0, 0, 0}, {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} }; typedef struct { + HASH *filehash; + xb_rstream_t *stream; + ds_ctxt_t *ds_ctxt; + ds_ctxt_t *ds_decrypt_ctxt; + pthread_mutex_t *mutex; +} extract_ctxt_t; + +typedef struct { char *path; uint pathlen; my_off_t offset; - ds_ctxt_t *ds_ctxt; ds_file_t *file; + pthread_mutex_t mutex; } file_entry_t; static int get_options(int *argc, char ***argv); static int mode_create(int argc, char **argv); -static int mode_extract(int argc, char **argv); +static int mode_extract(int n_threads, int argc, char **argv); static my_bool get_one_option(int optid, const struct my_option *opt, char *argument); @@ -88,6 +131,8 @@ main(int argc, char **argv) { MY_INIT(argv[0]); + crc_init(); + if (get_options(&argc, &argv)) { goto err; } @@ -104,7 +149,8 @@ main(int argc, char **argv) if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) { goto err; - } else if (opt_mode == RUN_MODE_EXTRACT && mode_extract(argc, argv)) { + } else if (opt_mode == RUN_MODE_EXTRACT && + mode_extract(opt_parallel, argc, argv)) { goto err; } @@ -302,9 +348,22 @@ err: return 1; } +/************************************************************************ +Check if string ends with given suffix. +@return true if string ends with given suffix. */ +static +my_bool +ends_with(const char *str, const char *suffix) +{ + size_t suffix_len = strlen(suffix); + size_t str_len = strlen(str); + return(str_len >= suffix_len + && strcmp(str + str_len - suffix_len, suffix) == 0); +} + static file_entry_t * -file_entry_new(ds_ctxt_t *ds_ctxt, const char *path, uint pathlen) +file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen) { file_entry_t *entry; ds_file_t *file; @@ -321,7 +380,11 @@ file_entry_new(ds_ctxt_t *ds_ctxt, const char *path, uint pathlen) } entry->pathlen = pathlen; - file = ds_open(ds_ctxt, path, NULL); + if (ctxt->ds_decrypt_ctxt && ends_with(path, ".xbcrypt")) { + file = ds_open(ctxt->ds_decrypt_ctxt, path, NULL); + } else { + file = ds_open(ctxt->ds_ctxt, path, NULL); + } if (file == NULL) { msg("%s: failed to create file.\n", my_progname); goto err; @@ -332,7 +395,8 @@ file_entry_new(ds_ctxt_t *ds_ctxt, const char *path, uint pathlen) } entry->file = file; - entry->ds_ctxt = ds_ctxt; + + pthread_mutex_init(&entry->mutex, NULL); return entry; @@ -358,68 +422,77 @@ static void file_entry_free(file_entry_t *entry) { + pthread_mutex_destroy(&entry->mutex); ds_close(entry->file); my_free(entry->path); my_free(entry); } static -int -mode_extract(int argc __attribute__((unused)), - char **argv __attribute__((unused))) +void * +extract_worker_thread_func(void *arg) { - xb_rstream_t *stream; - xb_rstream_result_t res; xb_rstream_chunk_t chunk; - HASH filehash; file_entry_t *entry; - ds_ctxt_t *ds_ctxt; + xb_rstream_result_t res; - stream = xb_stream_read_new(); - if (stream == NULL) { - msg("%s: xb_stream_read_new() failed.\n", my_progname); - return 1; - } + extract_ctxt_t *ctxt = (extract_ctxt_t *) arg; - /* If --directory is specified, it is already set as CWD by now. */ - ds_ctxt = ds_create(".", DS_TYPE_LOCAL); + my_thread_init(); - if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE, - 0, 0, (my_hash_get_key) get_file_entry_key, - (my_hash_free_key) file_entry_free, MYF(0))) { - msg("%s: failed to initialize file hash.\n", my_progname); - goto err; - } + memset(&chunk, 0, sizeof(chunk)); + + while (1) { + + pthread_mutex_lock(ctxt->mutex); + res = xb_stream_read_chunk(ctxt->stream, &chunk); + + if (res != XB_STREAM_READ_CHUNK) { + pthread_mutex_unlock(ctxt->mutex); + break; + } - while ((res = xb_stream_read_chunk(stream, &chunk)) == - XB_STREAM_READ_CHUNK) { /* If unknown type and ignorable flag is set, skip this chunk */ if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \ !(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) { + pthread_mutex_unlock(ctxt->mutex); continue; } /* See if we already have this file open */ - entry = (file_entry_t *) my_hash_search(&filehash, + entry = (file_entry_t *) my_hash_search(ctxt->filehash, (uchar *) chunk.path, chunk.pathlen); if (entry == NULL) { - entry = file_entry_new(ds_ctxt, chunk.path, + entry = file_entry_new(ctxt, + chunk.path, chunk.pathlen); if (entry == NULL) { - goto err; + pthread_mutex_unlock(ctxt->mutex); + break; } - if (my_hash_insert(&filehash, (uchar *) entry)) { + if (my_hash_insert(ctxt->filehash, (uchar *) entry)) { msg("%s: my_hash_insert() failed.\n", my_progname); - goto err; + pthread_mutex_unlock(ctxt->mutex); + break; } } - if (chunk.type == XB_CHUNK_TYPE_EOF) { - my_hash_delete(&filehash, (uchar *) entry); + pthread_mutex_lock(&entry->mutex); + + pthread_mutex_unlock(ctxt->mutex); + res = xb_stream_validate_checksum(&chunk); + + if (res != XB_STREAM_READ_CHUNK) { + pthread_mutex_unlock(&entry->mutex); + break; + } + + if (chunk.type == XB_CHUNK_TYPE_EOF) { + pthread_mutex_unlock(&entry->mutex); continue; } @@ -427,30 +500,114 @@ mode_extract(int argc __attribute__((unused)), msg("%s: out-of-order chunk: real offset = 0x%llx, " "expected offset = 0x%llx\n", my_progname, chunk.offset, entry->offset); - goto err; + pthread_mutex_unlock(&entry->mutex); + res = XB_STREAM_READ_ERROR; + break; } if (ds_write(entry->file, chunk.data, chunk.length)) { msg("%s: my_write() failed.\n", my_progname); - goto err; + pthread_mutex_unlock(&entry->mutex); + res = XB_STREAM_READ_ERROR; + break; } entry->offset += chunk.length; - }; - if (res == XB_STREAM_READ_ERROR) { - goto err; + pthread_mutex_unlock(&entry->mutex); } - my_hash_free(&filehash); - ds_destroy(ds_ctxt); - xb_stream_read_done(stream); + if (chunk.data) + my_free(chunk.data); + + my_thread_end(); + + return (void *)(res); +} + + +static +int +mode_extract(int n_threads, int argc __attribute__((unused)), + char **argv __attribute__((unused))) +{ + xb_rstream_t *stream = NULL; + HASH filehash; + ds_ctxt_t *ds_ctxt = NULL; + ds_ctxt_t *ds_decrypt_ctxt = NULL; + extract_ctxt_t ctxt; + int i; + pthread_t *tids = NULL; + void **retvals = NULL; + pthread_mutex_t mutex; + int ret = 0; + + if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE, + 0, 0, (my_hash_get_key) get_file_entry_key, + (my_hash_free_key) file_entry_free, MYF(0))) { + msg("%s: failed to initialize file hash.\n", my_progname); + return 1; + } + + if (pthread_mutex_init(&mutex, NULL)) { + msg("%s: failed to initialize mutex.\n", my_progname); + my_hash_free(&filehash); + return 1; + } + + /* If --directory is specified, it is already set as CWD by now. */ + ds_ctxt = ds_create(".", DS_TYPE_LOCAL); + if (ds_ctxt == NULL) { + ret = 1; + goto exit; + } + + + stream = xb_stream_read_new(); + if (stream == NULL) { + msg("%s: xb_stream_read_new() failed.\n", my_progname); + pthread_mutex_destroy(&mutex); + ret = 1; + goto exit; + } + + ctxt.stream = stream; + ctxt.filehash = &filehash; + ctxt.ds_ctxt = ds_ctxt; + ctxt.ds_decrypt_ctxt = ds_decrypt_ctxt; + ctxt.mutex = &mutex; + + tids = malloc(sizeof(pthread_t) * n_threads); + retvals = malloc(sizeof(void*) * n_threads); + + for (i = 0; i < n_threads; i++) + pthread_create(tids + i, NULL, extract_worker_thread_func, + &ctxt); + + for (i = 0; i < n_threads; i++) + pthread_join(tids[i], retvals + i); + + for (i = 0; i < n_threads; i++) { + if ((ulong)retvals[i] == XB_STREAM_READ_ERROR) { + ret = 1; + goto exit; + } + } + +exit: + pthread_mutex_destroy(&mutex); + + free(tids); + free(retvals); - return 0; -err: my_hash_free(&filehash); - ds_destroy(ds_ctxt); + if (ds_ctxt != NULL) { + ds_destroy(ds_ctxt); + } + if (ds_decrypt_ctxt) { + ds_destroy(ds_decrypt_ctxt); + } xb_stream_read_done(stream); - return 1; + return ret; } |