summaryrefslogtreecommitdiff
path: root/extra/mariabackup/xbstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'extra/mariabackup/xbstream.c')
-rw-r--r--extra/mariabackup/xbstream.c257
1 files changed, 207 insertions, 50 deletions
diff --git a/extra/mariabackup/xbstream.c b/extra/mariabackup/xbstream.c
index 9990b00ea4b..2cc47ec7273 100644
--- a/extra/mariabackup/xbstream.c
+++ b/extra/mariabackup/xbstream.c
@@ -22,10 +22,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#include <my_base.h>
#include <my_getopt.h>
#include <hash.h>
+#include <my_pthread.h>
#include "common.h"
#include "xbstream.h"
-#include "ds_local.h"
-#include "ds_stdout.h"
+#include "xbcrypt_common.h"
+#include "datasink.h"
+#include "ds_decrypt.h"
+#include "crc_glue.h"
#define XBSTREAM_VERSION "1.0"
#define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
@@ -38,6 +41,12 @@ typedef enum {
RUN_MODE_EXTRACT
} run_mode_t;
+const char *xbstream_encrypt_algo_names[] =
+{ "NONE", "AES128", "AES192", "AES256", NullS};
+TYPELIB xbstream_encrypt_algo_typelib=
+{array_elements(xbstream_encrypt_algo_names)-1,"",
+ xbstream_encrypt_algo_names, NULL};
+
/* Need the following definitions to avoid linking with ds_*.o and their link
dependencies */
datasink_t datasink_archive;
@@ -47,9 +56,18 @@ datasink_t datasink_tmpfile;
datasink_t datasink_encrypt;
datasink_t datasink_buffer;
-static run_mode_t opt_mode;
+static run_mode_t opt_mode;
static char * opt_directory = NULL;
static my_bool opt_verbose = 0;
+static int opt_parallel = 1;
+static ulong opt_encrypt_algo;
+static char *opt_encrypt_key_file = NULL;
+static void *opt_encrypt_key = NULL;
+static int opt_encrypt_threads = 1;
+
+enum {
+ OPT_ENCRYPT_THREADS = 256
+};
static struct my_option my_long_options[] =
{
@@ -65,21 +83,46 @@ static struct my_option my_long_options[] =
GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"parallel", 'p', "Number of worker threads for reading / writing.",
+ &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
+ 1, 1, INT_MAX, 0, 0, 0},
+ {"decrypt", 'd', "Decrypt files ending with .xbcrypt.",
+ &opt_encrypt_algo, &opt_encrypt_algo, &xbstream_encrypt_algo_typelib,
+ GET_ENUM, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"encrypt-key", 'k', "Encryption key.",
+ &opt_encrypt_key, &opt_encrypt_key, 0,
+ GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"encrypt-key-file", 'f', "File which contains encryption key.",
+ &opt_encrypt_key_file, &opt_encrypt_key_file, 0,
+ GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"encrypt-threads", OPT_ENCRYPT_THREADS,
+ "Number of threads for parallel data encryption. "
+ "The default value is 1.",
+ &opt_encrypt_threads, &opt_encrypt_threads,
+ 0, GET_INT, REQUIRED_ARG, 1, 1, INT_MAX, 0, 0, 0},
{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
typedef struct {
+ HASH *filehash;
+ xb_rstream_t *stream;
+ ds_ctxt_t *ds_ctxt;
+ ds_ctxt_t *ds_decrypt_ctxt;
+ pthread_mutex_t *mutex;
+} extract_ctxt_t;
+
+typedef struct {
char *path;
uint pathlen;
my_off_t offset;
- ds_ctxt_t *ds_ctxt;
ds_file_t *file;
+ pthread_mutex_t mutex;
} file_entry_t;
static int get_options(int *argc, char ***argv);
static int mode_create(int argc, char **argv);
-static int mode_extract(int argc, char **argv);
+static int mode_extract(int n_threads, int argc, char **argv);
static my_bool get_one_option(int optid, const struct my_option *opt,
char *argument);
@@ -88,6 +131,8 @@ main(int argc, char **argv)
{
MY_INIT(argv[0]);
+ crc_init();
+
if (get_options(&argc, &argv)) {
goto err;
}
@@ -104,7 +149,8 @@ main(int argc, char **argv)
if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) {
goto err;
- } else if (opt_mode == RUN_MODE_EXTRACT && mode_extract(argc, argv)) {
+ } else if (opt_mode == RUN_MODE_EXTRACT &&
+ mode_extract(opt_parallel, argc, argv)) {
goto err;
}
@@ -302,9 +348,22 @@ err:
return 1;
}
+/************************************************************************
+Check if string ends with given suffix.
+@return true if string ends with given suffix. */
+static
+my_bool
+ends_with(const char *str, const char *suffix)
+{
+ size_t suffix_len = strlen(suffix);
+ size_t str_len = strlen(str);
+ return(str_len >= suffix_len
+ && strcmp(str + str_len - suffix_len, suffix) == 0);
+}
+
static
file_entry_t *
-file_entry_new(ds_ctxt_t *ds_ctxt, const char *path, uint pathlen)
+file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen)
{
file_entry_t *entry;
ds_file_t *file;
@@ -321,7 +380,11 @@ file_entry_new(ds_ctxt_t *ds_ctxt, const char *path, uint pathlen)
}
entry->pathlen = pathlen;
- file = ds_open(ds_ctxt, path, NULL);
+ if (ctxt->ds_decrypt_ctxt && ends_with(path, ".xbcrypt")) {
+ file = ds_open(ctxt->ds_decrypt_ctxt, path, NULL);
+ } else {
+ file = ds_open(ctxt->ds_ctxt, path, NULL);
+ }
if (file == NULL) {
msg("%s: failed to create file.\n", my_progname);
goto err;
@@ -332,7 +395,8 @@ file_entry_new(ds_ctxt_t *ds_ctxt, const char *path, uint pathlen)
}
entry->file = file;
- entry->ds_ctxt = ds_ctxt;
+
+ pthread_mutex_init(&entry->mutex, NULL);
return entry;
@@ -358,68 +422,77 @@ static
void
file_entry_free(file_entry_t *entry)
{
+ pthread_mutex_destroy(&entry->mutex);
ds_close(entry->file);
my_free(entry->path);
my_free(entry);
}
static
-int
-mode_extract(int argc __attribute__((unused)),
- char **argv __attribute__((unused)))
+void *
+extract_worker_thread_func(void *arg)
{
- xb_rstream_t *stream;
- xb_rstream_result_t res;
xb_rstream_chunk_t chunk;
- HASH filehash;
file_entry_t *entry;
- ds_ctxt_t *ds_ctxt;
+ xb_rstream_result_t res;
- stream = xb_stream_read_new();
- if (stream == NULL) {
- msg("%s: xb_stream_read_new() failed.\n", my_progname);
- return 1;
- }
+ extract_ctxt_t *ctxt = (extract_ctxt_t *) arg;
- /* If --directory is specified, it is already set as CWD by now. */
- ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
+ my_thread_init();
- if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
- 0, 0, (my_hash_get_key) get_file_entry_key,
- (my_hash_free_key) file_entry_free, MYF(0))) {
- msg("%s: failed to initialize file hash.\n", my_progname);
- goto err;
- }
+ memset(&chunk, 0, sizeof(chunk));
+
+ while (1) {
+
+ pthread_mutex_lock(ctxt->mutex);
+ res = xb_stream_read_chunk(ctxt->stream, &chunk);
+
+ if (res != XB_STREAM_READ_CHUNK) {
+ pthread_mutex_unlock(ctxt->mutex);
+ break;
+ }
- while ((res = xb_stream_read_chunk(stream, &chunk)) ==
- XB_STREAM_READ_CHUNK) {
/* If unknown type and ignorable flag is set, skip this chunk */
if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \
!(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) {
+ pthread_mutex_unlock(ctxt->mutex);
continue;
}
/* See if we already have this file open */
- entry = (file_entry_t *) my_hash_search(&filehash,
+ entry = (file_entry_t *) my_hash_search(ctxt->filehash,
(uchar *) chunk.path,
chunk.pathlen);
if (entry == NULL) {
- entry = file_entry_new(ds_ctxt, chunk.path,
+ entry = file_entry_new(ctxt,
+ chunk.path,
chunk.pathlen);
if (entry == NULL) {
- goto err;
+ pthread_mutex_unlock(ctxt->mutex);
+ break;
}
- if (my_hash_insert(&filehash, (uchar *) entry)) {
+ if (my_hash_insert(ctxt->filehash, (uchar *) entry)) {
msg("%s: my_hash_insert() failed.\n",
my_progname);
- goto err;
+ pthread_mutex_unlock(ctxt->mutex);
+ break;
}
}
- if (chunk.type == XB_CHUNK_TYPE_EOF) {
- my_hash_delete(&filehash, (uchar *) entry);
+ pthread_mutex_lock(&entry->mutex);
+
+ pthread_mutex_unlock(ctxt->mutex);
+ res = xb_stream_validate_checksum(&chunk);
+
+ if (res != XB_STREAM_READ_CHUNK) {
+ pthread_mutex_unlock(&entry->mutex);
+ break;
+ }
+
+ if (chunk.type == XB_CHUNK_TYPE_EOF) {
+ pthread_mutex_unlock(&entry->mutex);
continue;
}
@@ -427,30 +500,114 @@ mode_extract(int argc __attribute__((unused)),
msg("%s: out-of-order chunk: real offset = 0x%llx, "
"expected offset = 0x%llx\n", my_progname,
chunk.offset, entry->offset);
- goto err;
+ pthread_mutex_unlock(&entry->mutex);
+ res = XB_STREAM_READ_ERROR;
+ break;
}
if (ds_write(entry->file, chunk.data, chunk.length)) {
msg("%s: my_write() failed.\n", my_progname);
- goto err;
+ pthread_mutex_unlock(&entry->mutex);
+ res = XB_STREAM_READ_ERROR;
+ break;
}
entry->offset += chunk.length;
- };
- if (res == XB_STREAM_READ_ERROR) {
- goto err;
+ pthread_mutex_unlock(&entry->mutex);
}
- my_hash_free(&filehash);
- ds_destroy(ds_ctxt);
- xb_stream_read_done(stream);
+ if (chunk.data)
+ my_free(chunk.data);
+
+ my_thread_end();
+
+ return (void *)(res);
+}
+
+
+static
+int
+mode_extract(int n_threads, int argc __attribute__((unused)),
+ char **argv __attribute__((unused)))
+{
+ xb_rstream_t *stream = NULL;
+ HASH filehash;
+ ds_ctxt_t *ds_ctxt = NULL;
+ ds_ctxt_t *ds_decrypt_ctxt = NULL;
+ extract_ctxt_t ctxt;
+ int i;
+ pthread_t *tids = NULL;
+ void **retvals = NULL;
+ pthread_mutex_t mutex;
+ int ret = 0;
+
+ if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
+ 0, 0, (my_hash_get_key) get_file_entry_key,
+ (my_hash_free_key) file_entry_free, MYF(0))) {
+ msg("%s: failed to initialize file hash.\n", my_progname);
+ return 1;
+ }
+
+ if (pthread_mutex_init(&mutex, NULL)) {
+ msg("%s: failed to initialize mutex.\n", my_progname);
+ my_hash_free(&filehash);
+ return 1;
+ }
+
+ /* If --directory is specified, it is already set as CWD by now. */
+ ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
+ if (ds_ctxt == NULL) {
+ ret = 1;
+ goto exit;
+ }
+
+
+ stream = xb_stream_read_new();
+ if (stream == NULL) {
+ msg("%s: xb_stream_read_new() failed.\n", my_progname);
+ pthread_mutex_destroy(&mutex);
+ ret = 1;
+ goto exit;
+ }
+
+ ctxt.stream = stream;
+ ctxt.filehash = &filehash;
+ ctxt.ds_ctxt = ds_ctxt;
+ ctxt.ds_decrypt_ctxt = ds_decrypt_ctxt;
+ ctxt.mutex = &mutex;
+
+ tids = malloc(sizeof(pthread_t) * n_threads);
+ retvals = malloc(sizeof(void*) * n_threads);
+
+ for (i = 0; i < n_threads; i++)
+ pthread_create(tids + i, NULL, extract_worker_thread_func,
+ &ctxt);
+
+ for (i = 0; i < n_threads; i++)
+ pthread_join(tids[i], retvals + i);
+
+ for (i = 0; i < n_threads; i++) {
+ if ((ulong)retvals[i] == XB_STREAM_READ_ERROR) {
+ ret = 1;
+ goto exit;
+ }
+ }
+
+exit:
+ pthread_mutex_destroy(&mutex);
+
+ free(tids);
+ free(retvals);
- return 0;
-err:
my_hash_free(&filehash);
- ds_destroy(ds_ctxt);
+ if (ds_ctxt != NULL) {
+ ds_destroy(ds_ctxt);
+ }
+ if (ds_decrypt_ctxt) {
+ ds_destroy(ds_decrypt_ctxt);
+ }
xb_stream_read_done(stream);
- return 1;
+ return ret;
}