diff options
Diffstat (limited to 'src/rep/rep_backup.c')
| -rw-r--r-- | src/rep/rep_backup.c | 2148 |
1 files changed, 2025 insertions, 123 deletions
diff --git a/src/rep/rep_backup.c b/src/rep/rep_backup.c index cfde7622..14bc63bb 100644 --- a/src/rep/rep_backup.c +++ b/src/rep/rep_backup.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -9,6 +9,7 @@ #include "db_config.h" #include "db_int.h" +#include "dbinc/blob.h" #include "dbinc/db_page.h" #include "dbinc/db_am.h" #include "dbinc/fop.h" @@ -26,21 +27,45 @@ * Note that the fileinfo for the first file in the list always appears at * (constant) offset __REP_UPDATE_SIZE in the buffer. */ +#define FILE_CTX_INMEM_ONLY 0x01 typedef struct { u_int8_t *buf; /* Buffer base address. */ u_int32_t size; /* Total allocated buffer size. */ u_int8_t *fillptr; /* Pointer to first unused space. */ u_int32_t count; /* Number of entries currently in list. */ u_int32_t version; /* Rep version of marshaled format. */ + u_int32_t flags; /* Context flags. */ } FILE_LIST_CTX; #define FIRST_FILE_PTR(buf) ((buf) + __REP_UPDATE_SIZE) /* + * Flags used to show the state of blob files on the master in messages + * sent to the client. + */ +#define BLOB_DONE 0x01 +#define BLOB_DELETE 0x02 +#define BLOB_CHUNK_FAIL 0x04 + +#define BLOB_ID_SIZE sizeof(db_seq_t) +#define BLOB_KEY_SIZE (2 * BLOB_ID_SIZE) + +/* * Function that performs any desired processing on a single file, as part of * the traversal of a list of database files, such as with internal init. */ typedef int (FILE_WALK_FN) __P((ENV *, __rep_fileinfo_args *, void *)); +static int __rep_add_files_to_list __P(( + ENV *, const char *, const char *, FILE_LIST_CTX *, const char **, int)); +static int __rep_blob_chunk_gap + __P((ENV *, int, DB_THREAD_INFO *, REP *, int *, db_seq_t, int)); +static int __rep_blob_cleanup __P((ENV *, REP *)); +static int __rep_blobdone + __P((ENV *, int, DB_THREAD_INFO *, REP *, db_seq_t, int)); +static int __rep_blob_find_files __P((ENV *, DB_THREAD_INFO *, const char *, + db_seq_t *, db_seq_t, db_seq_t, db_seq_t *, DBT *, size_t *, u_int32_t *)); +static int __rep_blob_sort_dirs __P((ENV *, + int (*)(const char *), char **, int, char ***, int *)); static FILE_WALK_FN __rep_check_uid; static int __rep_clean_interrupted __P((ENV *)); static FILE_WALK_FN __rep_cleanup_nimdbs; @@ -52,6 +77,8 @@ static int __rep_get_fileinfo __P((ENV *, const char *, const char *, __rep_fileinfo_args *, u_int8_t *)); static int __rep_get_file_list __P((ENV *, DB_FH *, u_int32_t, u_int32_t *, DBT *)); +static int __rep_init_file_list_context __P((ENV *, + u_int32_t, u_int32_t, int, FILE_LIST_CTX *)); static int __rep_is_replicated_db __P((const char *, const char *)); static int __rep_log_setup __P((ENV *, REP *, u_int32_t, u_int32_t, DB_LSN *)); @@ -72,9 +99,12 @@ static FILE_WALK_FN __rep_remove_file; static int __rep_remove_logs __P((ENV *)); static int __rep_remove_nimdbs __P((ENV *)); static int __rep_rollback __P((ENV *, DB_LSN *)); +static int __rep_select_blob_file __P((const char *)); +static int __rep_select_blob_sdb __P((const char *)); static int __rep_unlink_by_list __P((ENV *, u_int32_t, u_int8_t *, u_int32_t, u_int32_t)); static FILE_WALK_FN __rep_unlink_file; +static int __rep_walk_blob_dir __P((ENV *, FILE_LIST_CTX*)); static int __rep_walk_filelist __P((ENV *, u_int32_t, u_int8_t *, u_int32_t, u_int32_t, FILE_WALK_FN *, void *)); static int __rep_walk_dir __P((ENV *, const char *, const char *, @@ -129,14 +159,12 @@ __rep_update_req(env, rp) dblp = env->lg_handle; logc = NULL; - if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0) - goto err_noalloc; - context.size = MEGABYTE; - context.count = 0; - context.version = rp->rep_version; /* Reserve space for the update_args, and fill in file info. */ - context.fillptr = FIRST_FILE_PTR(context.buf); + if ((ret = __rep_init_file_list_context(env, rp->rep_version, + F_ISSET(rp, REPCTL_INMEM_ONLY) ? FILE_CTX_INMEM_ONLY : 0, + 1, &context)) != 0) + goto err_noalloc; if ((ret = __rep_find_dbs(env, &context)) != 0) goto err; @@ -214,6 +242,472 @@ err_noalloc: } /* + * Passed to the __rep_blob_sort_dirs function. + * Select blob files, of the form __db.bl### + */ +static int +__rep_select_blob_file(file) + const char *file; +{ + if (strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) == 0) + return (1); + else + return (0); +} + +/* + * Passed to the __rep_blob_sort_dirs function. + * Select blob subdatabase directories, of the form __db### + */ +static int +__rep_select_blob_sdb(file) + const char *file; +{ + if (strncmp(BLOB_DIR_PREFIX, file, strlen(BLOB_DIR_PREFIX)) == 0 && + strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) != 0 && + strcmp(BLOB_META_FILE_NAME, file) != 0) + return (1); + else + return (0); +} + +/* + * __rep_blob_sort_dirs + * Create a sorted list of directory names that all share a type that + * is selected using the given function. + */ +static int +__rep_blob_sort_dirs(env, select_fn, dirs, dirs_cnt, sorted, sorted_cnt) + ENV *env; + int (*select_fn) __P((const char *)); + char **dirs; + int dirs_cnt; + char ***sorted; + int *sorted_cnt; +{ + char **sort, *tmp; + int i, ret, size, sort_cnt, swapped; + + *sorted = NULL; + *sorted_cnt = 0; + sort_cnt = 0; + + if ((ret = __os_malloc(env, + (sizeof(char *) * (unsigned int)dirs_cnt), &sort)) != 0) + return (ret); + + for (i = 0; i < dirs_cnt; i++) { + if (select_fn(dirs[i])) { + sort[sort_cnt] = dirs[i]; + sort_cnt++; + } + } + + /* + * Directories are usually returned in order, or close to it, so use + * Bubble Sort to sort the list. + */ + size = sort_cnt; + swapped = 1; + while (swapped == 1 && size > 1) { + swapped = 0; + for (i = 0; (i + 1) < size; i++) { + if (strcmp(sort[i], sort[i+1]) > 0) { + tmp = sort[i]; + sort[i] = sort[i+1]; + sort[i+1] = tmp; + swapped = 1; + } + } + size--; + } + + *sorted = sort; + *sorted_cnt = sort_cnt; + + return (0); +} + +#define BLOB_THROTTLE_DEFAULT (10 * MEGABYTE) + +/* + * __rep_blob_update_req + * Send a list of blob files, starting after the blob id and sub-database + * id sent in the BLOB_UPDATE_REQ message. + * + * PUBLIC: int __rep_blob_update_req __P((ENV *, DB_THREAD_INFO *, DBT *)); + */ +int +__rep_blob_update_req(env, ip, rec) + ENV *env; + DB_THREAD_INFO *ip; + DBT *rec; +{ + DBT rbudbt; + REP *rep; + __rep_blob_update_args rbu; + __rep_blob_update_req_args rbur; + db_seq_t blob_fid, blob_id, blob_sdb, tmp; + int cur, dirs_cnt, ret, sdb_cnt; + size_t sent; + char *blob_sub_dir, *dir, **dirs, **sdb; + u_int32_t num_blobs, throttle; + u_int8_t *ptr; + + memset(&rbu, 0, sizeof(__rep_blob_update_args)); + memset(&rbudbt, 0, sizeof(DBT)); + blob_sub_dir = dir = NULL; + dirs = sdb = NULL; + sent = 0; + num_blobs = 0; + cur = dirs_cnt = sdb_cnt = 0; + rep = env->rep_handle->region; + throttle = rep->gbytes * GIGABYTE + rep->bytes; + if (throttle == 0) + throttle = BLOB_THROTTLE_DEFAULT; + + if ((ret = __rep_blob_update_req_unmarshal( + env, &rbur, rec->data, rec->size, &ptr)) != 0) + goto err; + + RPRINT(env, (env, DB_VERB_REP_SYNC, +"blob_update_req: file_id %llu sdb_id %llu blob_id %llu highest %llu", + (long long)rbur.blob_fid, (long long)rbur.blob_sid, + (long long)rbur.blob_id, (long long)rbur.highest_id)); + + rbu.blob_fid = rbur.blob_fid; + + if ((ret = __os_malloc(env, MEGABYTE, &rbudbt.data)) != 0) + goto err; + rbudbt.ulen = MEGABYTE; + rbudbt.size = __REP_BLOB_UPDATE_SIZE; + + blob_fid = (db_seq_t)rbur.blob_fid; + blob_sdb = (db_seq_t)rbur.blob_sid; + blob_id = (db_seq_t)rbur.blob_id; + + /* Find the first blob file if it is unknown. */ + if (blob_id == 0 && blob_sdb == 0) { +find_sdb: if (dirs == NULL) { + if ((ret = __blob_make_sub_dir( + env, &blob_sub_dir, blob_fid, 0)) != 0) + goto err; + if ((ret = __db_appname( + env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0) + goto err; + /* If no directory, there are no blobs to send. */ + if (__os_exists(env, dir, NULL) != 0) + goto filedone; + + if ((ret = __os_dirlist( + env, dir, 1, &dirs, &dirs_cnt)) != 0) + goto err; + + if (dirs_cnt == 0) + goto filedone; + + if ((ret = __rep_blob_sort_dirs( + env, __rep_select_blob_sdb, + dirs, dirs_cnt, &sdb, &sdb_cnt)) != 0) + goto err; + } + /* + * Iterate through the list of subdirectories, until we find + * one that has an id larger than the current subdirectory id. + */ + while (cur < sdb_cnt) { + if ((ret = __blob_path_to_dir_ids( + env, sdb[cur], &tmp, NULL)) != 0) + goto err; + if (blob_sdb < tmp) { + blob_sdb = tmp; + break; + } + cur++; + } + /* Check if no more subdirectories to search */ + if (sdb_cnt != 0 && cur == sdb_cnt) + goto filedone; + if (dir != NULL) + __os_free(env, dir); + dir = NULL; + if (blob_sub_dir != NULL) + __os_free(env, blob_sub_dir); + blob_sub_dir = NULL; + } + + if (blob_sub_dir == NULL && (ret = + __blob_make_sub_dir(env, &blob_sub_dir, blob_fid, blob_sdb)) != 0) + goto err; + + if (dir == NULL && (ret = __db_appname( + env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0) + goto err; + /* Search the current directory for blob files with id > blob_id. */ + if ((ret = __rep_blob_find_files( + env, ip, dir, &blob_id, blob_sdb, blob_fid, + (db_seq_t *)&rbur.highest_id, &rbudbt, &sent, &num_blobs)) != 0) + goto err; + + /* + * If we have not reached the send limit, and there are still + * directories to search, then search the next directory. + */ + if (sent < throttle) { + if (blob_sdb != 0) { + rbur.highest_id = 0; + blob_id = 0; + __os_free(env, blob_sub_dir); + blob_sub_dir = NULL; + __os_free(env, dir); + dir = NULL; + goto find_sdb; + } else { + /* Mark as the end of the files. */ +filedone: F_SET(&rbu, BLOB_DONE); + rbur.highest_id = 0; + } + } else + STAT(rep->stat.st_nthrottles++); + + rbu.num_blobs = num_blobs; + rbu.highest_id = rbur.highest_id; + __rep_blob_update_marshal(env, &rbu, rbudbt.data); + RPRINT(env, (env, DB_VERB_REP_SYNC, + "Sending blob_update: file_id %llu, num_blobs %lu, flags %lu", + (long long)rbu.blob_fid, + (long)num_blobs, (unsigned long)rbu.flags)); + (void)__rep_send_message( + env, DB_EID_BROADCAST, REP_BLOB_UPDATE, NULL, &rbudbt, 0, 0); + +err: if (sdb != NULL) + __os_free(env, sdb); + if (dirs != NULL) + __os_dirfree(env, dirs, dirs_cnt); + if (dir != NULL) + __os_free(env, dir); + if (blob_sub_dir != NULL) + __os_free(env, blob_sub_dir); + if (rbudbt.data != NULL) + __os_free(env, rbudbt.data); + return (ret); +} + +/* + * __rep_blob_find_files + * + * Search a directory for blob files, starting with the given blob id and + * sub-database id. Add information for each blob to the message buffer until + * there are no more files, or it has reached the maximum send amount in terms + * of combined blob files size. + * + * This search is complicated because the blobs have to be sent in order by id, + * but there can be huge holes between a blob file and the one with the next + * highest id, so iterating through the ids looking to see if the file exists + * for each id will take too long. The solution is to walk the directory + * hierarchy in order, reading every file in that directory, sorting them by + * id, and adding them to the update list. + */ +static int +__rep_blob_find_files( + env, ip, dir, blob_id, blob_sid, blob_fid, highest, buf, sent, num) + ENV *env; + DB_THREAD_INFO *ip; + const char *dir; + db_seq_t *blob_id; + db_seq_t blob_sid; + db_seq_t blob_fid; + db_seq_t *highest; + DBT *buf; + size_t *sent; + u_int32_t *num; +{ + DB *bmd; + DB_FH *fhp; + DB_TXN *txn; + REP *rep; + __rep_blob_file_args rbf; + char blob_path[MAX_BLOB_PATH_SZ], **dirs, **files, *path, *ptr; + db_seq_t tmp; + int blob_path_len, cur, depth, dirs_cnt, files_cnt, ret; + off_t blob_size; + size_t len; + u_int32_t bytes, mbytes, throttle; + + bmd = NULL; + txn = NULL; + fhp = NULL; + path = NULL; + dirs = files = NULL; + dirs_cnt = files_cnt = 0; + rbf.blob_sid = (u_int64_t)blob_sid; + rep = env->rep_handle->region; + throttle = rep->gbytes * GIGABYTE + rep->bytes; + if (throttle == 0) + throttle = BLOB_THROTTLE_DEFAULT; + + if ((ret = __os_malloc( + env, strlen(dir) + MAX_BLOB_PATH_SZ, &path)) != 0) + goto err; + + /* + * Read the highest possible blob id from the blob meta database, so + * we know when to stop looking for files for this database. The + * highest value is reset everytime we switch to a new subdatabase. + */ + if (*highest == 0) { + if ((ret = __db_create_internal(&bmd, env, 0)) != 0) + goto err; + + if ((ret = __txn_begin( + env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0) + goto err; + + bmd->blob_file_id = blob_fid; + bmd->blob_sdb_id = blob_sid; + if ((ret = __blob_highest_id(bmd, txn, highest) ) != 0) + goto err; + + if ((ret = __txn_abort(txn)) != 0) + goto err; + txn = NULL; + if ((ret = __db_close(bmd, NULL, 0)) != 0) + goto err; + bmd = NULL; + (*highest)++; + } + + (*blob_id)++; + while (*sent < throttle && *blob_id < *highest) { + memset(blob_path, 0, MAX_BLOB_PATH_SZ); + blob_path_len = depth = 0; + + /* Calucate the subdirectory from the blob id. */ + __blob_calculate_dirs( + *blob_id, blob_path, &blob_path_len, &depth); + if (blob_path_len != 0) { + (void)sprintf(path, "%s%c%s%c", + dir, PATH_SEPARATOR[0], blob_path, PATH_SEPARATOR[0]); + } else + (void)sprintf(path, "%s", dir); + len = strlen(path); + + /* If the sub-directory does not exist, look for the next. */ + if (__os_exists(env, path, NULL) != 0) { + (*blob_id) += + BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS); + continue; + } + + /* Get a list of all the blob files, sorted by id. */ + if ((ret = __os_dirlist(env, path, 0, &dirs, &dirs_cnt)) != 0) + goto err; + + if ((ret = __rep_blob_sort_dirs(env, __rep_select_blob_file, + dirs, dirs_cnt, &files, &files_cnt)) != 0) + goto err; + + /* + * Find the first blob file with an id greater than or equal to + * the last id. + */ + for (cur = 0; cur < files_cnt; cur++) { + ptr = files[cur]; + ptr += strlen(BLOB_FILE_PREFIX); + if ((ret = __blob_str_to_id( + env, (const char **)&ptr, &tmp)) != 0) + goto err; + DB_ASSERT(env, tmp != 0); + if (tmp >= *blob_id) + break; + } + + /* Add each remaining blob file to the message buffer. */ + while (cur < files_cnt) { + /* Get the blob id from the current file name. */ + (void)sprintf(path + len, "%s", files[cur]); + ptr = path + len + strlen(BLOB_FILE_PREFIX); + if ((ret = __blob_str_to_id( + env, (const char **)&ptr, blob_id)) != 0) + goto err; + rbf.blob_id = (u_int64_t)*blob_id; + /* Open the file and get its size. */ + if ((ret = __os_open( + env, path, 0, DB_OSO_RDONLY, 0, &fhp)) != 0) { + if (ret == ENOENT) { + ret = 0; + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_update blob file: %llu deleted, skipping.", + (long long)rbf.blob_id)); + cur++; + continue; + } + goto err; + } + if ((ret = __os_ioinfo( + env, path, fhp, &mbytes, &bytes, NULL)) != 0) + goto err; + if ((ret =__os_closehandle(env, fhp)) != 0) + goto err; + fhp = NULL; + blob_size = ((off_t)mbytes * (off_t)MEGABYTE) + bytes; + rbf.blob_size = (u_int64_t)blob_size; + if (blob_size > UINT32_MAX) + (*sent) = throttle + 1; + else { + if (((*sent) + (size_t)blob_size) < (*sent)) + (*sent) = throttle + 1; + else + (*sent) += (size_t)blob_size; + } + __rep_blob_file_marshal( + env, &rbf, (u_int8_t *)buf->data + buf->size); + (*num)++; + buf->size += __REP_BLOB_FILE_SIZE; + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_update adding: blob_sid %llu, blob_id %llu blob_size %llu", + (long long)rbf.blob_sid, + (long long)rbf.blob_id, (long long)rbf.blob_size)); + if ((*sent) > throttle) + goto err; + + /* Resize if there is not enough space to grow. */ + if (buf->size > (buf->ulen - __REP_BLOB_FILE_SIZE)) { + if ((ret = __os_realloc( + env, buf->ulen * 2, &buf->data)) != 0) + goto err; + buf->ulen *= 2; + } + cur++; + } + /* + * Move to the next directory of blob files by setting the blob + * id to the next largest possible value. + */ + (*blob_id) += BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS); + __os_free(env, files); + files = NULL; + __os_dirfree(env, dirs, dirs_cnt); + dirs = NULL; + } +err: + if (path != NULL) + __os_free(env, path); + if (files != NULL) + __os_free(env, files); + if (dirs != NULL) + __os_dirfree(env, dirs, dirs_cnt); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (txn != NULL) + (void)__txn_abort(txn); + if (bmd != NULL) + (void)__db_close(bmd, NULL, 0); + + return (ret); +} + +/* * __rep_find_dbs - * Walk through all the named files/databases including those in the * environment or data_dirs and those that in named and in-memory. We @@ -240,7 +734,8 @@ __rep_find_dbs(env, context) * replicated user databases. If the application has a metadata_dir, * this will also find any persistent internal system databases. */ - if (dbenv->db_data_dir != NULL) { + if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && + dbenv->db_data_dir != NULL) { for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) { if ((ret = __db_appname(env, DB_APP_NONE, *ddir, NULL, &real_dir)) != 0) @@ -252,16 +747,24 @@ __rep_find_dbs(env, context) real_dir = NULL; } } + /* * Walk the environment directory. If the application doesn't * have a metadata_dir, this will return persistent internal system * databases. If the application doesn't have a separate data * directory, this will also return all user databases. */ - if (ret == 0) + if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0) ret = __rep_walk_dir(env, env->db_home, NULL, context); - /* Now, collect any in-memory named databases. */ + /* Gather the databases in the blob directory. */ + if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0) + ret = __rep_walk_blob_dir(env, context); + + /* + * Now, collect any in-memory named databases. We do this no + * matter if the INMEM_ONLY flag is set or not. + */ if (ret == 0) ret = __rep_walk_dir(env, NULL, NULL, context); @@ -271,6 +774,148 @@ __rep_find_dbs(env, context) } /* + * __rep_walk_blob_dir -- + * + * The blob directory hierarchy consists of a top layer that contains the + * blob meta database (BMD) and a set of blob directories (BLDIR). + * Each BLDIR corresponds to a database file. If the database file doesn't + * contain subdatabases, the BLDIR contains a BMD and blob files. If the + * database file contains subdatabases, the BLDIR contains a BLSDIR + * subdirectory for each subdatabase. Each BLSDIR contains a BMD and blob + * files. + * + * This function walks the blob directory hierarchy and records any BMD. + * It first checks if the top level BMD exists, and if it does searches + * the first and second layers of the hierarchy for BMDs. + */ +static int +__rep_walk_blob_dir(env, context) + ENV *env; + FILE_LIST_CTX *context; +{ + int cnt, cnt2, i, j, ret; + size_t len; + char *blob_dir, *blob_sub, **dirs, *name, *name2, **subdirs; + char blob_sub_buf[MAX_BLOB_PATH_SZ]; + const char *bmd, *dirp; + + cnt = cnt2 = 0; + blob_dir = name = name2 = NULL; + dirs = subdirs = NULL; + bmd = BLOB_META_FILE_NAME; + blob_sub = blob_sub_buf; + + if ((ret = __db_appname( + env, DB_APP_BLOB, BLOB_META_FILE_NAME, &dirp, &name)) != 0) + goto err; + + /* + * If the main blob meta database does not exist, then no databases in + * the environment supports blobs. + */ + if ((ret = __os_exists(env, name, NULL)) != 0) { + ret = 0; + goto err; + } + + /* Get the blob directory. */ + if ((ret = __db_appname( + env, DB_APP_BLOB, NULL, &dirp, &blob_dir)) != 0) + goto err; + + if ((ret = __rep_add_files_to_list( + env, blob_dir, NULL, context, &bmd, 1)) != 0) + goto err; + + if ((ret = __os_dirlist(env, blob_dir, 1, &dirs, &cnt)) != 0) + goto err; + + __os_free(env, name); + name = NULL; + if ((ret = __os_malloc( + env, MAX_BLOB_PATH_SZ + strlen(blob_dir), &name)) != 0) + goto err; + + for (i = 0; i < cnt; i++) { + /* + * Skip blob files and the top level BMD + * (which was handled above). + */ + if (IS_BLOB_META(dirs[i]) || IS_BLOB_FILE(dirs[i])) + continue; + len = strlen(blob_dir) + + strlen(dirs[i]) + strlen(BLOB_META_FILE_NAME) + 3; + (void)snprintf(name, len, "%s%c%s%c%s", blob_dir, + PATH_SEPARATOR[0], dirs[i], PATH_SEPARATOR[0], + BLOB_META_FILE_NAME); + /* + * If a blob meta database exists, add it to the list, and move + * on to the next directory, otherwise get a directory list and + * check the second layer for BMD. If a directory contains a + * BMD, then it cannot contain subdirectories with BMD. + */ + if (__os_exists(env, name, NULL) == 0) { + (void)snprintf(blob_sub, + strlen(dirs[i]) + strlen(bmd) + 2, + "%s%c%s", dirs[i], PATH_SEPARATOR[0], bmd); + if ((ret = __rep_add_files_to_list(env, blob_dir, + NULL, context, (const char **)&blob_sub, 1)) != 0) + goto err; + } else { + len = strlen(blob_dir) + strlen(dirs[i]) + 2; + (void)snprintf(name, len, "%s%c%s", + blob_dir, PATH_SEPARATOR[0], dirs[i]); + if ((ret = __os_dirlist( + env, name, 1, &subdirs, &cnt2)) != 0) + goto err; + if (name2 == NULL) { + if ((ret = __os_malloc(env, + MAX_BLOB_PATH_SZ + strlen(name), + &name2)) != 0) + goto err; + } + for (j = 0; j < cnt2; j++) { + if (IS_BLOB_FILE(subdirs[j])) + continue; + len = strlen(name) + strlen(subdirs[j]) + + strlen(BLOB_META_FILE_NAME) + 3; + (void)snprintf(name2, len, "%s%c%s%c%s", + name, PATH_SEPARATOR[0], subdirs[j], + PATH_SEPARATOR[0], BLOB_META_FILE_NAME); + if ((ret = __os_exists( + env, name2, NULL)) == 0) { + len = strlen(dirs[i]) + + strlen(subdirs[j]) + + strlen(bmd) + 3; + (void)snprintf(blob_sub, + len, "%s%c%s%c%s", dirs[i], + PATH_SEPARATOR[0], subdirs[j], + PATH_SEPARATOR[0], bmd); + if ((ret = __rep_add_files_to_list( + env, blob_dir, NULL, context, + (const char **)&blob_sub, 1)) != 0) + goto err; + } + } + __os_dirfree(env, subdirs, cnt2); + subdirs = NULL; + } + } + +err: if (name != NULL) + __os_free(env, name); + if (name2 != NULL) + __os_free(env, name2); + if (blob_dir != NULL) + __os_free(env, blob_dir); + if (dirs != NULL) + __os_dirfree(env, dirs, cnt); + if (subdirs != NULL) + __os_dirfree(env, subdirs, cnt2); + return (ret); +} + +/* * __rep_walk_dir -- * * This is the routine that walks a directory and fills in the structures @@ -284,11 +929,8 @@ __rep_walk_dir(env, dir, datadir, context) const char *dir, *datadir; FILE_LIST_CTX *context; { - __rep_fileinfo_args tmpfp; - size_t avail, len; - int cnt, first_file, i, ret; - u_int8_t uid[DB_FILE_ID_LEN]; - char *file, **names, *subdb; + int cnt, ret; + char **names; if (dir == NULL) { VPRINT(env, (env, DB_VERB_REP_SYNC, @@ -304,7 +946,34 @@ __rep_walk_dir(env, dir, datadir, context) } VPRINT(env, (env, DB_VERB_REP_SYNC, "Walk_dir: Dir %s has %d files", (dir == NULL) ? "INMEM" : dir, cnt)); + ret = __rep_add_files_to_list( + env, dir, datadir, context, (const char **)names, cnt); + + __os_dirfree(env, names, cnt); + return (ret); +} + +/* + * __rep_add_files_to_list -- + * + * Add the given files to the file list. + */ +static int +__rep_add_files_to_list(env, dir, datadir, context, names, cnt) + ENV *env; + const char *dir, *datadir; + FILE_LIST_CTX *context; + const char **names; + int cnt; +{ + __rep_fileinfo_args tmpfp; + size_t avail, len; + int first_file, i, ret; + u_int8_t uid[DB_FILE_ID_LEN]; + const char *file, *subdb; + first_file = 1; + ret = 0; for (i = 0; i < cnt; i++) { VPRINT(env, (env, DB_VERB_REP_SYNC, "Walk_dir: File %d name: %s", i, names[i])); @@ -372,15 +1041,19 @@ __rep_walk_dir(env, dir, datadir, context) DB_SET_DBT(tmpfp.uid, uid, DB_FILE_ID_LEN); retry: avail = (size_t)(&context->buf[context->size] - context->fillptr); + /* + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. + */ if (context->version < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, context->version, (__rep_fileinfo_v6_args *)&tmpfp, context->fillptr, avail, &len); + else if (context->version < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, context->version, + (__rep_fileinfo_v7_args *)&tmpfp, + context->fillptr, avail, &len); else ret = __rep_fileinfo_marshal(env, context->version, &tmpfp, context->fillptr, avail, &len); @@ -409,9 +1082,7 @@ retry: avail = (size_t)(&context->buf[context->size] - */ context->fillptr += len; } -err: - __os_dirfree(env, names, cnt); - return (ret); +err: return (ret); } /* @@ -430,7 +1101,7 @@ __rep_is_replicated_db(name, dir) /* * Remaining things that don't have a "__db" prefix are eligible. */ - if (!IS_DB_FILE(name)) + if (!IS_DB_FILE(name) || IS_BLOB_META(name)) return (1); /* Here, we know we have a "__db" name. */ @@ -470,7 +1141,7 @@ __rep_check_uid(env, rfp, uid) if (memcmp(rfp->uid.data, uid, DB_FILE_ID_LEN) == 0) { VPRINT(env, (env, DB_VERB_REP_SYNC, "Check_uid: Found matching file.")); - ret = DB_KEYEXIST; + ret = USR_ERR(env, DB_KEYEXIST); } return (ret); @@ -489,6 +1160,7 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid) DB_THREAD_INFO *ip; PAGE *pagep; int lorder, ret, t_ret; + u_int32_t flags; dbp = NULL; dbc = NULL; @@ -503,11 +1175,15 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid) * database handles would block the master from handling UPDATE_REQ. */ F_SET(dbp, DB_AM_RECOVER); - if ((ret = __db_open(dbp, ip, NULL, file, subdb, DB_UNKNOWN, - DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0), - 0, PGNO_BASE_MD)) != 0) + flags = DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0); + if (file != NULL && IS_BLOB_META(file)) + LF_SET(DB_INTERNAL_BLOB_DB); + if ((ret = __db_open(dbp, ip, NULL, + file, subdb, DB_UNKNOWN, flags, 0, PGNO_BASE_MD)) != 0) goto err; + SET_LO_HI_VAR(dbp->blob_file_id, rfp->blob_fid_lo, rfp->blob_fid_hi); + if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) goto err; if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, ip, dbc->txn, @@ -574,6 +1250,7 @@ __rep_page_req(env, ip, eid, rp, rec) { __rep_fileinfo_args *msgfp, msgf; __rep_fileinfo_v6_args *msgfpv6; + __rep_fileinfo_v7_args *msgfpv7; DB_MPOOLFILE *mpf; DB_REP *db_rep; REP *rep; @@ -584,21 +1261,30 @@ __rep_page_req(env, ip, eid, rp, rec) db_rep = env->rep_handle; rep = db_rep->region; + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (rp->rep_version < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version, &msgfpv6, rec->data, rec->size, &next)) != 0) return (ret); memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args)); msgf.dir.data = NULL; msgf.dir.size = 0; + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; msgfp = &msgf; msgfree = msgfpv6; + } else if (rp->rep_version < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version, + &msgfpv7, rec->data, rec->size, &next)) != 0) + return (ret); + memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args)); + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; + msgfp = &msgf; + msgfree = msgfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, &msgfp, rec->data, rec->size, &next)) != 0) @@ -624,7 +1310,7 @@ __rep_page_req(env, ip, eid, rp, rec) (void)__rep_send_message(env, eid, REP_FILE_FAIL, NULL, rec, 0, 0); else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } @@ -738,7 +1424,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) #ifdef HAVE_QUEUE if ((ret = __qam_fget(qdbc, &p, 0, &pagep)) == ENOENT) #endif - ret = DB_PAGE_NOTFOUND; + ret = USR_ERR(env, DB_PAGE_NOTFOUND); } else ret = __memp_fget(mpf, &p, ip, NULL, 0, &pagep); msgfp->pgno = p; @@ -748,16 +1434,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) RPRINT(env, (env, DB_VERB_REP_SYNC, "sendpages: PAGE_FAIL on page %lu", (u_long)p)); + /* + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. + */ if (rp->rep_version < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rp->rep_version, (__rep_fileinfo_v6_args *)msgfp, buf, msgsz, &len); + else if (rp->rep_version < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, + rp->rep_version, + (__rep_fileinfo_v7_args *)msgfp, + buf, msgsz, &len); else ret = __rep_fileinfo_marshal(env, rp->rep_version, msgfp, buf, @@ -772,7 +1463,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0); continue; } else - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } else if (ret != 0) goto err; @@ -796,16 +1487,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) RPRINT(env, (env, DB_VERB_REP_SYNC, "sendpages: %lu, page lsn [%lu][%lu]", (u_long)p, (u_long)pagep->lsn.file, (u_long)pagep->lsn.offset)); + /* + * It is safe to cast to the old structs + * because the first part of the current + * structs matches the old struct. + */ if (rp->rep_version < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rp->rep_version, (__rep_fileinfo_v6_args *)msgfp, buf, msgsz, &len); + else if (rp->rep_version < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, + rp->rep_version, + (__rep_fileinfo_v7_args *)msgfp, + buf, msgsz, &len); else ret = __rep_fileinfo_marshal(env, rp->rep_version, msgfp, buf, msgsz, &len); @@ -1010,7 +1706,8 @@ __rep_update_setup(env, eid, rp, rec, savetime, lsn) ZERO_LSN(lp->waiting_lsn); ZERO_LSN(lp->max_wait_lsn); ZERO_LSN(lp->max_perm_lsn); - if (db_rep->rep_db == NULL) + ret = __rep_blob_cleanup(env, rep); + if (ret == 0 && db_rep->rep_db == NULL) ret = __rep_client_dbinit(env, 0, REP_DB); MUTEX_UNLOCK(env, rep->mtx_clientdb); if (ret != 0) @@ -1148,6 +1845,337 @@ err: /* return (ret); } +/* + * __rep_blob_update + * Prepare to receive blob file data by setting up the blob gap database, + * then requesting the blob file data. + * + * PUBLIC: int __rep_blob_update __P((ENV *, int, DB_THREAD_INFO *, DBT *)); + */ +int +__rep_blob_update(env, eid, ip, rec) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + DBT *rec; +{ + DBC *dbc; + DB_REP *db_rep; + DBT data, key; + REP *rep; + REGINFO *infop; + __rep_blob_file_args rbf; + __rep_blob_update_args rbu; + __rep_fileinfo_args *rfp; + db_seq_t blob_fid; + int ret; + off_t offset; + size_t len; + u_int32_t num_blobs; + u_int8_t keybuf[BLOB_KEY_SIZE], *ptr; + + db_rep = env->rep_handle; + rep = db_rep->region; + infop = env->reginfo; + rfp = NULL; + dbc = NULL; + memset(&rbu, 0, sizeof(__rep_blob_update_args)); + memset(&rbf, 0, sizeof(__rep_blob_file_args)); + + if ((ret = __rep_blob_update_unmarshal( + env, &rbu, rec->data, rec->size, &ptr)) != 0) + return (ret); + len = rec->size - __REP_BLOB_UPDATE_SIZE; + + RPRINT(env, (env, DB_VERB_REP_SYNC, +"blob_update: file_id %llu, num_blobs %lu, flags %lu, highest %llu", + (long long)rbu.blob_fid, (long)rbu.num_blobs, + (unsigned long)rbu.flags, (long long)rbu.highest_id)); + + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + + /* + * Check if the world changed. + */ + if (rep->sync_state != SYNC_PAGE) + goto unlock; + + /* Make sure this is for the current database. */ + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret); + if (ret != 0) + goto unlock; + + if (blob_fid != (db_seq_t)rbu.blob_fid) + goto unlock; + + rep->highest_id = (db_seq_t)rbu.highest_id; + /* + * For each blob file, add an entry to the database for each 1 MB + * section of that file. The entries will be deleted as the + * coresponding blob chunks arrive and are written to disk. + */ + if (db_rep->blob_dbp == NULL && + (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) + goto unlock; + + if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0) + goto unlock; + + /* + * Make sure no one else has populated the database, this could happen + * if the update message is sent twice. + */ + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != DB_NOTFOUND) + goto unlock; + + /* It is possible for a blob database to have no blobs. */ + if (rbu.num_blobs == 0) { + (void)__dbc_close(dbc); + dbc = NULL; + rep->blob_more_files = 0; + rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0; + rep->last_blob_id = rep->last_blob_sid = 0; + rep->prev_blob_id = rep->prev_blob_sid = 0; + rep->gap_bl_hi_off = 0; + rep->blob_sync = 0; + rep->highest_id = 0; + rep->blob_rereq = 0; + ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0); + goto unlock; + } + + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + data.flags = key.flags = DB_DBT_USERMEM; + key.data = keybuf; + key.ulen = key.size = BLOB_KEY_SIZE; + data.data = (void *)&offset; + data.ulen = data.size = sizeof(offset); + num_blobs = 0; + while (num_blobs < rbu.num_blobs) { + if ((ret = + __rep_blob_file_unmarshal(env, &rbf, ptr, len, &ptr)) != 0) + goto unlock; + len -= __REP_BLOB_FILE_SIZE; + + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_update adding file: blob_id %llu, sdb_id %llu, blob_size %llu", + (long long)rbf.blob_id, (long long)rbf.blob_sid, + (long long)rbf.blob_size)); + + memcpy(keybuf, &rbf.blob_sid, BLOB_ID_SIZE); + memcpy(&(keybuf[BLOB_ID_SIZE]), &rbf.blob_id, BLOB_ID_SIZE); + offset = 0; + /* + * Add an entry for each megabyte of the blob file. Zero + * length blob files should have at least one entry. + */ + do { + if ((ret = __dbc_put(dbc, &key, &data, 0)) != 0) + goto unlock; + offset += MEGABYTE; + /* + * Check for overflow, this can happen when the master + * supports 64 file offsets, but the client does not. + */ + if (offset < 0) { + __db_errx(env, + DB_STR("3704", + "Blob file offset overflow")); + ret = EINVAL; + goto unlock; + } + } while ((u_int32_t)offset < rbf.blob_size); + num_blobs++; + } + /* Set whether there are more files after the ones on the list. */ + if (F_ISSET(&rbu, BLOB_DONE)) + rep->blob_more_files = 0; + else + rep->blob_more_files = 1; + rep->prev_blob_id = rep->last_blob_id; + rep->prev_blob_sid = rep->last_blob_sid; + rep->last_blob_sid = (db_seq_t)rbf.blob_sid; + rep->last_blob_id = (db_seq_t)rbf.blob_id; + + /* + * Send the same message payload in a REP_BLOB_ALL_REQ message to get + * the blob data. Peer-to-peer initialization is not supported for + * blobs, so we can only send this back to the master despite the fact + * that building the list of blob files is expensive. + */ + (void)__rep_send_message( + env, rep->master_id, REP_BLOB_ALL_REQ, NULL, rec, 0, 0); + +unlock: REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (dbc != NULL) + (void)__dbc_close(dbc); + + return (ret); +} + +/* + * __rep_blob_allreq + * Request blob file data. + * + * PUBLIC: int __rep_blob_allreq __P((ENV *, int, DBT *)); + */ +int +__rep_blob_allreq(env, eid, rec) + ENV *env; + int eid; + DBT *rec; +{ + DB *dbp; + DB_FH *fhp; + DBT msg; + __rep_blob_chunk_args rbc; + __rep_blob_file_args rbf; + __rep_blob_update_args rbu; + db_seq_t old_sdb_id; + int done, ret; + off_t offset; + size_t len; + u_int32_t num_blobs; + u_int8_t *chunk_buf, *msg_buf, *ptr; + + dbp = NULL; + fhp = NULL; + chunk_buf = msg_buf = NULL; + memset(&rbu, 0, sizeof(__rep_blob_update_args)); + memset(&rbc, 0, sizeof(__rep_blob_chunk_args)); + memset(&msg, 0, sizeof(DBT)); + + if ((ret = + __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0) + goto err; + msg.data = msg_buf; + msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE; + if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0) + goto err; + rbc.data.data = chunk_buf; + rbc.data.ulen = MEGABYTE; + rbc.data.flags = DB_DBT_USERMEM; + + /* + * The REP_BLOB_ALL_REQ message sends the REP_BLOB_UPDATE message + * payload back to the master to request the actual blobs after the + * client has prepared itself to receive them. + */ + len = rec->size; + if ((ret = __rep_blob_update_unmarshal( + env, &rbu, rec->data, rec->size, &ptr)) != 0) + goto err; + len -= __REP_BLOB_UPDATE_SIZE; + + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_all_req: file_id %llu, num_blobs %lu, flags %lu", + (long long)rbu.blob_fid, (long)rbu.num_blobs, + (unsigned long)rbu.flags)); + + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + goto err; + dbp->blob_file_id = (db_seq_t)rbu.blob_fid; + rbc.blob_fid = rbu.blob_fid; + num_blobs = 0; + /* + * The list of files to send is included in the message, go + * through the list and send each file in pieces. + */ + while (num_blobs < rbu.num_blobs) { + num_blobs++; + if ((ret = __rep_blob_file_unmarshal( + env, &rbf, ptr, len, &ptr)) != 0) + goto err; + len -= __REP_BLOB_FILE_SIZE; + old_sdb_id = dbp->blob_sdb_id; + dbp->blob_sdb_id = (db_seq_t)rbf.blob_sid; + rbc.flags = 0; + rbc.blob_sid = rbf.blob_sid; + rbc.blob_id = rbf.blob_id; + /* Free the sub-directory information if it has changed. */ + if (old_sdb_id != dbp->blob_sdb_id && + dbp->blob_sub_dir != NULL) { + __os_free(env, dbp->blob_sub_dir); + dbp->blob_sub_dir = NULL; + } + if (dbp->blob_sub_dir == NULL) { + if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir, + dbp->blob_file_id, dbp->blob_sdb_id)) != 0) + goto err; + } + if ((ret = __blob_file_open(dbp, + &fhp, (db_seq_t)rbf.blob_id, DB_FOP_READONLY, 0)) != 0) { + /* + * The file may have been deleted between creating the + * list and sending the data. Send a message saying + * the file has been deleted. + */ + if (ret == ENOENT) { + F_SET(&rbc, BLOB_DELETE); + rbc.data.size = 0; + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE; + (void)__rep_send_message(env, + eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + ret = 0; + fhp = NULL; + continue; + } + goto err; + } + offset = 0; + do { + done = 0; + rbc.flags = 0; + if ((ret = __blob_file_read( + env, fhp, &rbc.data, offset, MEGABYTE)) != 0) + goto err; + DB_ASSERT(env, rbc.data.size <= MEGABYTE); + + /* + * In rare cases the blob file may have gotten shorter + * since the list was created. + */ + if (rbc.data.size < (u_int32_t)MEGABYTE && (u_int64_t) + (offset + rbc.data.size) < rbf.blob_size) { + F_SET(&rbc, BLOB_CHUNK_FAIL); + done = 1; + } + /* File may have grown since the list was made. */ + if ((u_int64_t) + (offset + rbc.data.size) > rbf.blob_size) { + rbc.data.size = + (u_int32_t)((off_t)rbf.blob_size - offset); + } + rbc.offset = (u_int64_t)offset; + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size; + (void)__rep_send_message( + env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + offset += MEGABYTE; + } while ((u_int64_t)offset < rbf.blob_size && !done); + + if (fhp != NULL && (ret = __os_closehandle(env, fhp)) != 0) + goto err; + fhp = NULL; + } +err: if (chunk_buf != NULL) + __os_free(env, chunk_buf); + if (msg_buf != NULL) + __os_free(env, msg_buf); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (dbp != 0) + (void)__db_close(dbp, NULL, 0); + return (ret); +} + static int __rep_find_inmem(env, rfp, unused) ENV *env; @@ -1157,6 +2185,11 @@ __rep_find_inmem(env, rfp, unused) COMPQUIET(env, NULL); COMPQUIET(unused, NULL); + /* + * Cannot assume all databases are in-memory because abbreviated + * internal inits from 5.3 and earlier are not limited to in-memory + * databases. + */ return (FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? DB_KEYEXIST : 0); } @@ -1172,12 +2205,9 @@ __rep_remove_nimdbs(env) FILE_LIST_CTX context; int ret; - if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0) + if ((ret = __rep_init_file_list_context(env, + DB_REPVERSION, 0, 0, &context)) != 0) return (ret); - context.size = MEGABYTE; - context.count = 0; - context.fillptr = context.buf; - context.version = DB_REPVERSION; /* NB: "NULL" asks walk_dir to consider only in-memory DBs */ if ((ret = __rep_walk_dir(env, NULL, NULL, &context)) != 0) @@ -1240,14 +2270,11 @@ __rep_remove_all(env, msg_version, rec) * 1. Get list of databases currently present at this client, which we * intend to remove. */ - if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0) - return (ret); - context.size = MEGABYTE; - context.count = 0; - context.version = DB_REPVERSION; /* Reserve space for the marshaled update_args. */ - context.fillptr = FIRST_FILE_PTR(context.buf); + if ((ret = __rep_init_file_list_context(env, + DB_REPVERSION, 0, 1, &context)) != 0) + return (ret); if ((ret = __rep_find_dbs(env, &context)) != 0) goto out; @@ -1333,6 +2360,9 @@ __rep_remove_all(env, msg_version, rec) FIRST_FILE_PTR(context.buf), context.size, context.count, __rep_remove_file, NULL)) != 0) goto out; + /* Remove the blob directory. */ + if ((ret = __blob_del_hierarchy(env)) != 0) + goto out; /* * 4. Safe-store the (new) list of database files we intend to copy from @@ -1445,6 +2475,8 @@ __rep_remove_file(env, rfp, unused) #ifdef HAVE_QUEUE DB_THREAD_INFO *ip; #endif + APPNAME appname; + db_seq_t blob_fid, blob_sid; char *name; int ret, t_ret; @@ -1496,29 +2528,53 @@ __rep_remove_file(env, rfp, unused) * That will only have removed extent files. Now * we need to deal with the actual file itself. */ + appname = __rep_is_internal_rep_file(rfp->info.data) ? + DB_APP_META : (IS_BLOB_META(rfp->info.data) ? + DB_APP_BLOB : DB_APP_DATA); if (FLD_ISSET(rfp->db_flags, DB_AM_INMEM)) { if ((ret = __db_create_internal(&dbp, env, 0)) != 0) return (ret); MAKE_INMEM(dbp); F_SET(dbp, DB_AM_RECOVER); /* Skirt locking. */ ret = __db_inmem_remove(dbp, NULL, name); - } else if ((ret = __fop_remove(env, - NULL, rfp->uid.data, name, (const char **)&rfp->dir.data, - __rep_is_internal_rep_file(rfp->info.data) ? - DB_APP_META : DB_APP_DATA, 0)) != 0) + } else if ((ret = __fop_remove(env, NULL, rfp->uid.data, name, + (const char **)&rfp->dir.data, appname, 0)) != 0) { /* * If fop_remove fails, it could be because * the client has a different data_dir * structure than the master. Retry with the - * local, default settings. + * local, default settings. */ ret = __fop_remove(env, - NULL, rfp->uid.data, name, NULL, - __rep_is_internal_rep_file(rfp->info.data) ? - DB_APP_META : DB_APP_DATA, 0); -#ifdef HAVE_QUEUE -out: + NULL, rfp->uid.data, name, NULL, appname, 0); +#ifdef DB_WIN32 + /* + * Deleting a blob meta database can result in a + * ERROR_PATH_NOT_FOUND error on windows, so treat + * that as an ENOENT. + */ + if (__os_posix_err(ret) == ENOENT) + ret = ENOENT; #endif + } + /* Clean any blob directories. */ + if (ret == 0 && appname == DB_APP_BLOB) { + /* dbp has not been set, since queues do not support blobs. */ + DB_ASSERT(env, dbp == NULL); + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + goto out; + if ((ret = __blob_path_to_dir_ids( + env, name, &blob_fid, &blob_sid)) != 0) + goto out; + /* blob_fid == 0 if it is the top level blob meta db. */ + if (blob_fid != 0) { + dbp->blob_file_id = blob_fid; + dbp->blob_sdb_id = blob_sid; + if ((ret = __blob_del_all(dbp, NULL, 0)) != 0) + goto out; + } + } +out: if (dbp != NULL && (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0) ret = t_ret; @@ -1610,10 +2666,11 @@ __rep_page(env, ip, eid, rp, rec) { DB_REP *db_rep; - DBT key, data; + DBT data, key; REP *rep; __rep_fileinfo_args *msgfp, msgf; __rep_fileinfo_v6_args *msgfpv6; + __rep_fileinfo_v7_args *msgfpv7; db_recno_t recno; int ret; char *msg; @@ -1647,21 +2704,30 @@ __rep_page(env, ip, eid, rp, rec) (u_long)rep->first_lsn.offset)); return (DB_REP_PAGEDONE); } + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (rp->rep_version < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version, &msgfpv6, rec->data, rec->size, NULL)) != 0) return (ret); memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args)); msgf.dir.data = NULL; msgf.dir.size = 0; + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; msgfp = &msgf; msgfree = msgfpv6; + } else if (rp->rep_version < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version, + &msgfpv7, rec->data, rec->size, NULL)) != 0) + return (ret); + memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args)); + msgf.blob_fid_lo = msgf.blob_fid_hi = 0; + msgfp = &msgf; + msgfree = msgfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, &msgfp, rec->data, rec->size, NULL)) != 0) @@ -1671,9 +2737,9 @@ __rep_page(env, ip, eid, rp, rec) MUTEX_LOCK(env, rep->mtx_clientdb); REP_SYSTEM_LOCK(env); /* - * Check if the world changed. + * Check if the world changed or if we are in the blob sync phase. */ - if (rep->sync_state != SYNC_PAGE) { + if (rep->sync_state != SYNC_PAGE || rep->blob_sync != 0) { ret = DB_REP_PAGEDONE; goto err; } @@ -1785,6 +2851,218 @@ err: REP_SYSTEM_UNLOCK(env); } /* + * __rep_blob_chunk + * Process a blob chunk message. When a blob chunk arrives, delete its + * entry in the blob chunk gap database to show that it has arrived, and + * write the data to the blob file. + * + * PUBLIC: int __rep_blob_chunk __P((ENV *, int, DB_THREAD_INFO *, DBT *)); + */ +int +__rep_blob_chunk(env, eid, ip, rec) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + DBT *rec; +{ + DB_REP *db_rep; + DBC *dbc; + DB_FH *fhp; + DBT data, key; + REP *rep; + REGINFO *infop; + __rep_blob_chunk_args rbc; + __rep_fileinfo_args *rfp; + db_seq_t blob_fid; + char *blob_sub_dir, *last, *mkpath, *name, *path; + int ret; + off_t offset; + u_int8_t keybuf[BLOB_KEY_SIZE], *ptr; + + ret = 0; + db_rep = env->rep_handle; + rep = db_rep->region; + infop = env->reginfo; + dbc = NULL; + blob_sub_dir = name = NULL; + path = NULL; + fhp = NULL; + + if (rep->sync_state != SYNC_PAGE) + return (DB_REP_PAGEDONE); + + if ((ret = __rep_blob_chunk_unmarshal( + env, &rbc, rec->data, rec->size, &ptr)) != 0) + return (ret); + + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + /* + * Check if the world changed. + */ + if (rep->sync_state != SYNC_PAGE) { + ret = DB_REP_PAGEDONE; + goto err; + } + /* + * We should not ever be in internal init with a lease granted. + */ + DB_ASSERT(env, + !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0); + + /* Make sure this is for the current file. */ + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret); + if (ret != 0) + goto err; + + if (blob_fid != (db_seq_t)rbc.blob_fid) { + ret = DB_REP_PAGEDONE; + goto err; + } + + RPRINT(env, (env, DB_VERB_REP_SYNC, +"REP_BLOB_CHUNK: blob_fid %llu, blob_sid %llu, blob_id %llu, offset %llu", + (unsigned long long)rbc.blob_fid, + (unsigned long long)rbc.blob_sid, + (unsigned long long)rbc.blob_id, (long long)rbc.offset)); + + if (db_rep->blob_dbp == NULL && + (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) { + RPRINT(env, (env, DB_VERB_REP_SYNC, + "REP_BLOB_CHUNK: Client_dbinit %s", + db_strerror(ret))); + goto err; + } + + /* Set the highest blob chunk received. */ + if (rbc.blob_sid > (u_int64_t)rep->gap_bl_hi_sid || + (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid && + rbc.blob_id > (u_int64_t)rep->gap_bl_hi_id) || + (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid && + rbc.blob_id == (u_int64_t)rep->gap_bl_hi_id && + rbc.offset > (u_int64_t)rep->gap_bl_hi_off)) { + rep->gap_bl_hi_id = (db_seq_t)rbc.blob_id; + rep->gap_bl_hi_sid = (db_seq_t)rbc.blob_sid; + rep->gap_bl_hi_off = (off_t)rbc.offset; + } + + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + data.flags = key.flags = DB_DBT_USERMEM; + key.data = keybuf; + key.ulen = key.size = BLOB_KEY_SIZE; + data.data = (void *)&offset; + data.ulen = data.size = sizeof(offset); + /* BLOB_DELETE is set if the blob file was deleted. */ + if (F_ISSET(&rbc, BLOB_DELETE)) { + memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE); + memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE); + if ((ret = __db_del( + db_rep->blob_dbp, ip, NULL, &key, 0)) != 0) { + if (ret == DB_NOTFOUND) + ret = 0; + goto err; + } + goto done; + } + + if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0) + goto err; + offset = (off_t)rbc.offset; + memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE); + memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE); + /* If not found we have already dealt with this chunk. */ + if ((ret = __dbc_get(dbc, &key, &data, DB_GET_BOTH)) != 0) { + if (ret == DB_NOTFOUND) { + ret = 0; + goto done; + } + goto err; + } + /* + * BLOB_CHUNK_FAIL is set if the blob file was truncated to shorter + * than the BLOB_CHUNK offset. + */ + if (F_ISSET(&rbc, BLOB_CHUNK_FAIL)) { + while (ret == 0) { + if ((ret = __dbc_del(dbc, 0)) != 0) + goto err; + ret = __dbc_get(dbc, &key, &data, DB_NEXT_DUP); + } + if (ret == DB_NOTFOUND) + ret = 0; + if ((ret = __dbc_close(dbc)) != 0) + goto err; + dbc = NULL; + goto done; + } + if ((ret = __dbc_del(dbc, 0)) != 0) + goto err; + if ((ret = __dbc_close(dbc)) != 0) + goto err; + dbc = NULL; + + if ((ret = __blob_make_sub_dir(env, &blob_sub_dir, + (db_seq_t)rbc.blob_fid, (db_seq_t)rbc.blob_sid)) != 0) + goto err; + + if ((ret = __blob_id_to_path( + env, blob_sub_dir, (db_seq_t)rbc.blob_id, &name)) != 0) + goto err; + + if ((ret = __db_appname(env, DB_APP_BLOB, name, NULL, &path)) != 0 ) + goto err; + + last = __db_rpath(path); + DB_ASSERT(env, last != NULL); + *last = '\0'; + if (__os_exists(env, path, NULL) != 0) { + *last = PATH_SEPARATOR[0]; + mkpath = path; +#ifdef DB_WIN32 + /* + * Absolute paths on windows can result in it creating a "C" + * or "D" directory in the working directory. + */ + if (__os_abspath(mkpath)) + mkpath += 2; +#endif + if ((ret = __db_mkpath(env, mkpath)) != 0) + goto err; + } + *last = PATH_SEPARATOR[0]; + if ((ret = __os_open( + env, path, 0, DB_OSO_CREATE, env->db_mode, &fhp)) != 0) + goto err; + + /* Write the data into the blob file. */ + if ((ret = __fop_write_file(env, NULL, name, NULL, DB_APP_BLOB, + fhp, (off_t)rbc.offset, rbc.data.data, rbc.data.size, 0)) != 0) + goto err; + if ((ret = __os_closehandle(env, fhp)) != 0) + goto err; + fhp = NULL; + +done: ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0); + +err: REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (path != NULL) + __os_free(env, path); + if (blob_sub_dir != NULL) + __os_free(env, blob_sub_dir); + if (name != NULL) + __os_free(env, name); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (dbc != NULL) + (void)__dbc_close(dbc); + + return (ret); +} + +/* * __rep_write_page - * Write this page into a database. */ @@ -1801,13 +3079,16 @@ __rep_write_page(env, ip, rep, msgfp) DB_PGINFO *pginfo; DB_REP *db_rep; REGINFO *infop; + APPNAME appname; __rep_fileinfo_args *rfp; + char *blob_path; int ret; void *dst; db_rep = env->rep_handle; infop = env->reginfo; rfp = NULL; + blob_path = NULL; /* * If this is the first page we're putting in this database, we need @@ -1830,15 +3111,39 @@ __rep_write_page(env, ip, rep, msgfp) RPRINT(env, (env, DB_VERB_REP_SYNC, "rep_write_page: Calling fop_create for %s", (char *)rfp->info.data)); + appname = (__rep_is_internal_rep_file(rfp->info.data) ? + DB_APP_META : (IS_BLOB_META((char *)rfp->info.data) + ? DB_APP_BLOB : DB_APP_DATA)); + /* + * May have to create the directory structure for blob + * metadata databases. + */ + if (appname == DB_APP_BLOB) { + if ((ret = __db_appname(env, + appname, rfp->info.data, + (const char **)&rfp->dir.data, + &blob_path)) != 0) + goto err; +#ifdef DB_WIN32 + /* + * Absolute paths on windows can result in + * it creating a "C" or "D" + * directory in the working directory. + */ + if (__os_abspath(blob_path)) + blob_path += 2; +#endif + if ((ret = __db_mkpath(env, blob_path)) != 0) + goto err; + } if ((ret = __fop_create(env, NULL, NULL, rfp->info.data, (const char **)&rfp->dir.data, - __rep_is_internal_rep_file(rfp->info.data) ? - DB_APP_META : DB_APP_DATA, env->db_mode, 0)) != 0) { + appname, env->db_mode, 0)) != 0) { /* * If fop_create fails, it could be because * the client has a different data_dir * structure than the master. Retry with the - * local, default settings. + * local, default settings. */ RPRINT(env, (env, DB_VERB_REP_SYNC, "rep_write_page: fop_create ret %d. Retry for %s, master datadir %s", @@ -1929,7 +3234,10 @@ __rep_write_page(env, ip, rep, msgfp) ret = __memp_fput(db_rep->file_mpf, ip, dst, db_rep->file_dbp->priority); -err: return (ret); +err: if (blob_path != NULL) + __os_free(env, blob_path); + + return (ret); } /* @@ -1976,7 +3284,7 @@ __rep_page_gap(env, rep, msgfp, type) * Make sure we're still talking about the same file. * If not, we're done here. */ - if (rfp->filenum != msgfp->filenum) { + if (rfp->filenum != msgfp->filenum || rep->blob_sync != 0) { ret = DB_REP_PAGEDONE; goto err; } @@ -2135,6 +3443,53 @@ err: } /* + * __rep_blob_cleanup - + * Clean up blob internal init information. + * + * Caller must hold client database mutex (mtx_clientdb) and + * REP_SYSTEM_LOCK. + */ +static int +__rep_blob_cleanup(env, rep) + ENV *env; + REP *rep; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + int ret, t_ret; + u_int32_t count; + + ret = 0; + db_rep = env->rep_handle; + + /* + * Delete any remaining records in the blob chunk database. The blob + * chunk database contains descriptions of the blob chunks that have + * yet to arrive. If not deleted, the remaining records could + * interfere with how the next REP_BLOB_UPDATE message is handled. + */ + if (db_rep->blob_dbp != NULL) { + ENV_GET_THREAD_INFO(env, ip); + ret = __db_truncate(db_rep->blob_dbp, ip, NULL, &count); + t_ret = __db_close(db_rep->blob_dbp, NULL, DB_NOSYNC); + if (ret == 0) + ret = t_ret; + db_rep->blob_dbp = NULL; + } + /* Reset blob internal init control values. */ + rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0; + rep->last_blob_id = rep->last_blob_sid = 0; + rep->prev_blob_id = rep->prev_blob_sid = 0; + rep->gap_bl_hi_off = 0; + rep->blob_more_files = 0; + rep->blob_sync = 0; + rep->highest_id = 0; + rep->blob_rereq = 0; + + return (ret); +} + +/* * __rep_init_cleanup - * Clean up internal initialization pieces. * @@ -2162,9 +3517,10 @@ __rep_init_cleanup(env, rep, force) /* * 1. Close up the file data pointer we used. * 2. Close/reset the page database. - * 3. Close/reset the queue database if we're forcing a cleanup. - * 4. Free current file info. - * 5. If we have all files or need to force, free original file info. + * 3. Close/truncate the blob chunk gap database. + * 4. Close/reset the queue database if we're forcing a cleanup. + * 5. Free current file info. + * 6. If we have all files or need to force, free original file info. */ if (db_rep->file_mpf != NULL) { ret = __memp_fclose(db_rep->file_mpf, 0); @@ -2176,6 +3532,15 @@ __rep_init_cleanup(env, rep, force) if (ret == 0) ret = t_ret; } + /* + * Truncate the blob chunk gap database, since entries in the database + * are for blob chunks we are expecting to arrive. Also reset blob + * internal init control values. + */ + t_ret = __rep_blob_cleanup(env, rep); + if (ret == 0) + ret = t_ret; + if (force && db_rep->queue_dbc != NULL) { queue_dbp = db_rep->queue_dbc->dbp; if ((t_ret = __dbc_close(db_rep->queue_dbc)) != 0 && ret == 0) @@ -2324,8 +3689,8 @@ __rep_clean_interrupted(env) * __rep_filedone - * We need to check if we're done with the current file after * processing the current page. Stat the database to see if - * we have all the pages. If so, we need to clean up/close - * this one, set up for the next one, and ask for its pages, + * we have all the pages and blobs. If so, we need to clean up/close + * this one, set up for the next one, and ask for its pages and blobs, * or if this is the last file, request the log records and * move to the REP_RECOVER_LOG state. */ @@ -2338,9 +3703,14 @@ __rep_filedone(env, ip, eid, rep, msgfp, type) __rep_fileinfo_args *msgfp; u_int32_t type; { + DBT msg; REGINFO *infop; __rep_fileinfo_args *rfp; + __rep_blob_update_req_args rbur; int ret; + u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE]; + + memset(&msg, 0, sizeof(DBT)); /* * We've put our page, now we need to do any gap processing @@ -2375,8 +3745,96 @@ __rep_filedone(env, ip, eid, rep, msgfp, type) ((ret = __rep_queue_filedone(env, ip, rep, rfp)) != DB_REP_PAGEDONE)) return (ret); + + /* Request blob files. */ + if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) { + ret = 0; + rep->blob_sync = 1; + memset(&rbur, 0, sizeof(__rep_blob_update_req_args)); + GET_LO_HI(env, + rfp->blob_fid_lo, rfp->blob_fid_hi, rbur.blob_fid, ret); + msg.size = __REP_BLOB_UPDATE_REQ_SIZE; + msg.data = buf; + __rep_blob_update_req_marshal(env, &rbur, msg.data); + (void)__rep_send_message(env, + rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0); + return (ret); + } + + /* + * We have all the data for this file. Clean up. + */ + if ((ret = __rep_init_cleanup(env, rep, 0)) != 0) + return (ret); + + rep->curfile++; + ret = __rep_nextfile(env, eid, rep); + + return (ret); +} + +/* + * __rep_blobdone - + * We need to check if we're done with the current file after + * processing the current blob chunk. + * + * Caller must hold client database mutex (mtx_clientdb) and + * REP_SYSTEM_LOCK. + */ +static int +__rep_blobdone(env, eid, ip, rep, blob_fid, force) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + REP *rep; + db_seq_t blob_fid; + int force; +{ + DBT msg; + __rep_blob_update_req_args rbur; + int done, ret; + u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE]; + /* - * We have all the pages for this file. Clean up. + * We've written our blob chunk, now we need to do any gap processing + * that might be needed to re-request chunks. + */ + done = 0; + ret = __rep_blob_chunk_gap(env, eid, ip, rep, &done, blob_fid, force); + /* + * The world changed while we were doing gap processing. + * We're done here. + */ + if (ret == DB_REP_PAGEDONE) + return (0); + else if (ret != 0) + goto err; + + /* + * If the blob database is empty then all files in the current list + * have been processed. However, there may be more files on the + * master, so request the next list if that is the case. + */ + if (done && rep->blob_more_files) { + memset(&rbur, 0, sizeof(__rep_blob_update_req_args)); + rbur.blob_fid = (u_int64_t)blob_fid; + rbur.blob_sid = (u_int64_t)rep->last_blob_sid; + rbur.blob_id = (u_int64_t)rep->last_blob_id; + rbur.highest_id = (u_int64_t)rep->highest_id; + rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0; + rep->gap_bl_hi_off = 0; + rep->blob_rereq = 0; + msg.size = __REP_BLOB_UPDATE_REQ_SIZE; + msg.data = buf; + __rep_blob_update_req_marshal(env, &rbur, msg.data); + (void)__rep_send_message(env, + rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0); + return (0); + } else if (!done) + return (0); + + /* + * We have all the data for this file. Clean up. */ if ((ret = __rep_init_cleanup(env, rep, 0)) != 0) goto err; @@ -2388,6 +3846,255 @@ err: } /* + * __rep_blob_chunk_gap - + * We have written a blob chunk. Now check if there are any that need + * to be re-requested. The blob chunk gap database contains + * descriptions of all the blob chunks that have yet to arrive. + * + * Caller must hold client database mutex (mtx_clientdb) and + * REP_SYSTEM_LOCK. + */ +static int +__rep_blob_chunk_gap(env, eid, ip, rep, done, blob_fid, force) + ENV *env; + int eid; + DB_THREAD_INFO *ip; + REP *rep; + int *done; + db_seq_t blob_fid; + int force; +{ + DBC *dbc; + DBT data, high, key, msg; + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + REGINFO *infop; + __rep_blob_chunk_req_args rbcr; + __rep_fileinfo_args *rfp; + db_seq_t cur_blob_fid; + off_t offset; + int ret; + u_int8_t buf[BLOB_KEY_SIZE], msgbuf[__REP_BLOB_CHUNK_REQ_SIZE]; + + db_rep = env->rep_handle; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + infop = env->reginfo; + ret = 0; + dbc = NULL; + *done = 0; + + /* eid will be used when peer-to-peer is re-enabled for blobs. */ + COMPQUIET(eid, 0); + + /* + * Make sure we're still talking about the same file. + * If not, we're done here. + */ + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, cur_blob_fid, ret); + if (cur_blob_fid != blob_fid) { + ret = DB_REP_PAGEDONE; + goto err; + } + + /* Get the first missing blob chunk. */ + if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0) + goto err; + memset(&key, 0, sizeof(DBT)); + memset(&data, 0, sizeof(DBT)); + ret = __dbc_get(dbc, &key, &data, DB_FIRST); + if (ret == DB_NOTFOUND) { + /* All blobs received. */ + ret = 0; + *done = 1; + goto err; + } else if (ret != 0) + goto err; + + DB_ASSERT(env, key.size == BLOB_KEY_SIZE); + DB_ASSERT(env, data.size == sizeof(off_t)); + offset = *(off_t *)data.data; + /* + * Format the sdbid and id of the high chunk as a blob gap + * database key, so it can be compared with the entries in that + * database. + */ + memset(&high, 0, sizeof(DBT)); + memcpy(buf, &rep->gap_bl_hi_sid, BLOB_ID_SIZE); + memcpy(buf + BLOB_ID_SIZE, &rep->gap_bl_hi_id, BLOB_ID_SIZE); + high.data = buf; + high.size = BLOB_KEY_SIZE; + + /* + * If the first chunk in the database is larger than the highest chunk + * received, then there is no gap. + * + * If a gap does exist, check if it is time to do a re-request. If so, + * re-request every chunk that exists before the highest received. + */ + if (!force && (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 || + (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 && + offset > rep->gap_bl_hi_off))) { + lp->wait_ts = db_rep->request_gap; + __os_gettime(env, &lp->rcvd_ts, 1); + } else if (force || __rep_check_doreq(env, rep)) { + /* + * Re-request every chunk less than the highest one, plus the + * next blob chunk that we are expecting. The next expected + * blob chunk is requested in case the last blob chunk is lost + * in transit. + */ + do { + memset(&rbcr, 0, sizeof(__rep_blob_chunk_req_args)); + memcpy(&(rbcr.blob_sid), key.data, BLOB_ID_SIZE); + memcpy(&(rbcr.blob_id), + (u_int8_t *)key.data + BLOB_ID_SIZE, BLOB_ID_SIZE); + rbcr.offset = *(u_int64_t *)data.data; + rbcr.blob_fid = (u_int64_t)blob_fid; + msg.size = __REP_BLOB_CHUNK_REQ_SIZE; + msg.data = msgbuf; + RPRINT(env, (env, DB_VERB_REP_SYNC, +"blob_chunk_gap: Req file_id %llu, sdb_id %llu, blob_id %llu, offset %llu", + (long long)rbcr.blob_fid, (long long)rbcr.blob_sid, + (long long)rbcr.blob_id, (long long)rbcr.offset)); + __rep_blob_chunk_req_marshal(env, &rbcr, msg.data); + /* + * Note that peer-to-peer initialization is not + * supported for blobs. + */ + (void)__rep_send_message( + env, rep->master_id, + REP_BLOB_CHUNK_REQ, NULL, &msg, 0, 0); + /* + * Break after requesting the chunk after the highest + * one. + */ + if (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 || + (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 && + offset > rep->gap_bl_hi_off)) + break; + if ((ret = __dbc_get( + dbc, &key, &data, DB_NEXT)) != 0) { + if (ret == DB_NOTFOUND) { + ret = 0; + break; + } + goto err; + } + } while (1); + } + +err: if (dbc != NULL) + (void)__dbc_close(dbc); + + return (ret); +} + +/* + * __rep_blob_chunk_req + * Answer a request for a specific blob chunk. + * + * PUBLIC: int __rep_blob_chunk_req __P((ENV *, int, DBT *)); + */ +int +__rep_blob_chunk_req(env, eid, rec) + ENV *env; + int eid; + DBT *rec; +{ + DB *dbp; + DBT msg; + DB_FH *fhp; + __rep_blob_chunk_args rbc; + __rep_blob_chunk_req_args rbcr; + int ret; + u_int8_t *chunk_buf, *msg_buf, *ptr; + + dbp = NULL; + fhp = NULL; + chunk_buf = msg_buf = NULL; + + if ((ret = + __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0) + goto err; + memset(&msg, 0, sizeof(DBT)); + msg.data = msg_buf; + msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE; + if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0) + goto err; + memset(&rbc, 0, sizeof(__rep_blob_chunk_args)); + rbc.data.data = chunk_buf; + rbc.data.ulen = MEGABYTE; + rbc.data.flags = DB_DBT_USERMEM; + + if ((ret = __rep_blob_chunk_req_unmarshal( + env, &rbcr, rec->data, rec->size, &ptr)) != 0) + goto err; + + RPRINT(env, (env, DB_VERB_REP_SYNC, + "blob_chunk_req: file_id %llu, sdbid %llu, id %llu, offset %llu", + (long long)rbcr.blob_fid, (long long)rbcr.blob_sid, + (long long)rbcr.blob_id, (long long)rbcr.offset)); + + rbc.blob_fid = rbcr.blob_fid; + rbc.blob_id = rbcr.blob_id; + rbc.blob_sid = rbcr.blob_sid; + rbc.offset = rbcr.offset; + if ((ret = __db_create_internal(&dbp, env, 0)) != 0) + goto err; + dbp->blob_file_id = (db_seq_t)rbcr.blob_fid; + dbp->blob_sdb_id = (db_seq_t)rbcr.blob_sid; + if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir, + (db_seq_t)rbcr.blob_fid, (db_seq_t)rbcr.blob_sid)) != 0) + goto err; + if ((ret = __blob_file_open( + dbp, &fhp, (db_seq_t)rbcr.blob_id, DB_FOP_READONLY, 0)) != 0) { + /* + * The file may have been deleted between creating the + * list and sending the request. Send a message saying + * the file has been deleted. + */ + if (ret == ENOENT) { + ret = 0; + F_SET(&rbc, BLOB_DELETE); + rbc.data.size = 0; + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE; + (void)__rep_send_message( + env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + goto err; + } + goto err; + } + if ((ret = __blob_file_read( + env, fhp, &rbc.data, (off_t)rbcr.offset, MEGABYTE)) != 0) + goto err; + DB_ASSERT(env, rbc.data.size <= MEGABYTE); + + /* + * In rare cases the blob file may have gotten shorter + * since the list was created. + */ + if (rbc.data.size == 0) + F_SET(&rbc, BLOB_CHUNK_FAIL); + __rep_blob_chunk_marshal(env, &rbc, msg.data); + msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size; + (void)__rep_send_message(env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0); + +err: if (chunk_buf != NULL) + __os_free(env, chunk_buf); + if (msg_buf != NULL) + __os_free(env, msg_buf); + if (fhp != NULL) + (void)__os_closehandle(env, fhp); + if (dbp != 0) + (void)__db_close(dbp, NULL, 0); + return (ret); +} + +/* * Starts requesting pages for the next file in the list (if any), or if not, * proceeds to the next stage: requesting logs. * @@ -2404,19 +4111,25 @@ __rep_nextfile(env, eid, rep) DBT dbt; __rep_logreq_args lr_args; DB_LOG *dblp; + DB_REP *db_rep; + DELAYED_BLOB_LIST *dbl; LOG *lp; REGENV *renv; REGINFO *infop; __rep_fileinfo_args *curinfo, *rfp, rf; __rep_fileinfo_v6_args *rfpv6; - int *curbuf, ret; + __rep_fileinfo_v7_args *rfpv7; + int *curbuf, ret, view_partial; u_int8_t *buf, *info_ptr, lrbuf[__REP_LOGREQ_SIZE], *nextinfo; size_t len, msgsz; + char *name; void *rffree; infop = env->reginfo; renv = infop->primary; + db_rep = env->rep_handle; rfp = NULL; + dbl = NULL; /* * Always direct the next request to the master (at least nominally), @@ -2430,13 +4143,13 @@ __rep_nextfile(env, eid, rep) /* Set curinfo to next file and examine it. */ info_ptr = R_ADDR(infop, rep->originfo_off + (rep->originfolen - rep->infolen)); + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (rep->infoversion < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, rep->infoversion, &rfpv6, info_ptr, rep->infolen, &nextinfo)) != 0) @@ -2444,8 +4157,18 @@ __rep_nextfile(env, eid, rep) memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args)); rf.dir.data = NULL; rf.dir.size = 0; + rf.blob_fid_lo = rf.blob_fid_hi = 0; rfp = &rf; rffree = rfpv6; + } else if (rep->infoversion < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, + rep->infoversion, &rfpv7, + info_ptr, rep->infolen, &nextinfo)) != 0) + return (ret); + memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args)); + rf.blob_fid_lo = rf.blob_fid_hi = 0; + rfp = &rf; + rffree = rfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, rep->infoversion, &rfp, info_ptr, @@ -2457,6 +4180,14 @@ __rep_nextfile(env, eid, rep) } rffree = rfp; } +#ifndef HAVE_64BIT_TYPES + if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) { + __db_errx(env, DB_STR("3705", + "Blobs require 64 integer compiler support.")); + __os_free(env, rffree); + return (DB_OPNOTSUP); + } +#endif rep->infolen -= (u_int32_t)(nextinfo - info_ptr); MUTEX_LOCK(env, renv->mtx_regenv); ret = __env_alloc(infop, sizeof(__rep_fileinfo_args) + @@ -2484,19 +4215,55 @@ __rep_nextfile(env, eid, rep) rfp->dir.data, rfp->dir.size); __os_free(env, rffree); - /* Skip over regular DB's in "abbreviated" internal inits. */ - if (F_ISSET(rep, REP_F_ABBREVIATED) && + /* + * If a partial callback is set, invoke the callback to see if + * this file should be replicated. + */ + if (IS_VIEW_SITE(env) && curinfo->info.size > 0 && !FLD_ISSET(curinfo->db_flags, DB_AM_INMEM)) { + name = (char *)curinfo->info.data; + DB_ASSERT(env, db_rep->partial != NULL); + /* + * Always replicate system owned databases. + */ + if (IS_DB_FILE(name) && !IS_BLOB_META(name)) + view_partial = 1; + else if ((ret = __rep_call_partial(env, + name, &view_partial, 0, &dbl)) != 0) { + VPRINT(env, (env, DB_VERB_REP_SYNC, + "rep_nextfile: partial cb err %d for %s", + ret, name)); + return (ret); + } + /* + * dbl != NULL when we could not find the name of the + * database that owns a blob meta database. If that + * happens then it was never opened, which means it + * was not replicated, and as such neither should its + * bmd be replicated. + */ + if (dbl != NULL) { + view_partial = 0; + __os_free(env, dbl); + dbl = NULL; + } VPRINT(env, (env, DB_VERB_REP_SYNC, - "Skipping file %d in abbreviated internal init", - curinfo->filenum)); - MUTEX_LOCK(env, renv->mtx_regenv); - __env_alloc_free(infop, - R_ADDR(infop, rep->curinfo_off)); - MUTEX_UNLOCK(env, renv->mtx_regenv); - rep->curinfo_off = INVALID_ROFF; - rep->curfile++; - continue; + "rep_nextfile: %s file %s %d on view site.", + view_partial == 0 ? + "Skipping" : "Replicating", + name, curinfo->filenum)); + /* + * If we're skipping the file, move to the next one. + */ + if (view_partial == 0) { + MUTEX_LOCK(env, renv->mtx_regenv); + __env_alloc_free(infop, + R_ADDR(infop, rep->curinfo_off)); + MUTEX_UNLOCK(env, renv->mtx_regenv); + rep->curinfo_off = INVALID_ROFF; + rep->curfile++; + continue; + } } /* Request this file's pages. */ @@ -2519,15 +4286,19 @@ __rep_nextfile(env, eid, rep) curinfo->uid.size + curinfo->info.size; if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0) return (ret); + /* + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. + */ if (rep->infoversion < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rep->infoversion, (__rep_fileinfo_v6_args *)curinfo, buf, msgsz, &len); + else if (rep->infoversion < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, rep->infoversion, + (__rep_fileinfo_v7_args *)curinfo, buf, + msgsz, &len); else ret = __rep_fileinfo_marshal(env, rep->infoversion, curinfo, buf, msgsz, &len); @@ -2834,16 +4605,19 @@ __rep_pggap_req(env, rep, reqfp, gapflags) * new info into rep->finfo. Assert that the sizes never * change. The only thing this should do is change * the pgno field. Everything else remains the same. + * + * It is safe to cast to the old structs + * because the first part of the current + * struct matches the old structs. */ if (rep->infoversion < DB_REPVERSION_53) - /* - * It is safe to cast to the old struct - * because the first part of the current - * struct matches the old struct. - */ ret = __rep_fileinfo_v6_marshal(env, rep->infoversion, (__rep_fileinfo_v6_args *)tmpfp, buf, msgsz, &len); + else if (rep->infoversion < DB_REPVERSION_61) + ret = __rep_fileinfo_v7_marshal(env, rep->infoversion, + (__rep_fileinfo_v7_args *)tmpfp, buf, + msgsz, &len); else ret = __rep_fileinfo_marshal(env, rep->infoversion, tmpfp, buf, msgsz, &len); @@ -2865,6 +4639,94 @@ err: } /* + * __rep_blob_rereq - + * + * Re-request lost blob messages, such as REP_BLOB_CHUNK_REQ, REP_BLOB_ALL_REQ, + * or REP_BLOB_UPDATE_REQ. Note that the blob chunk gap database contains + * descriptions of the blob chunks that we are expecting to arrive. + * + * Assumes the caller holds mtx_clientdb and rep_mutex. + * + * PUBLIC: int __rep_blob_rereq __P((ENV *, REP *)); + */ +int +__rep_blob_rereq(env, rep) + ENV *env; + REP *rep; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + REGINFO *infop; + __rep_fileinfo_args *rfp; + db_seq_t blob_fid; + int master, ret; + u_int32_t count; + + db_rep = env->rep_handle; + infop = env->reginfo; + rfp = NULL; + ret = 0; + + /* First check if the master is around to answer the re-request. */ + master = rep->master_id; + if (master == DB_EID_INVALID) { + (void)__rep_send_message(env, + DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); + goto err; + } + + if (db_rep->blob_dbp == NULL && + (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) { + RPRINT(env, (env, DB_VERB_REP_SYNC, + "REP_BLOB_CHUNK: Client_dbinit %s", + db_strerror(ret))); + goto err; + } + + /* + * If the gap blob id is 0 then we either lost a REP_BLOB_ALL_REQ or + * a REP_BLOB_UPDATE_REQ message. Since we do not have the information + * to reconstruct a REP_BLOB_ALL_REQ message, reset the blob gap + * database and start over at the REP_BLOB_UPDATE_REQ stage. + * + * If the blob gap id is not 0, we lost a REP_BLOB_CHUNK_REQ message, + * so perform blob gap processing. + */ + ENV_GET_THREAD_INFO(env, ip); + if (rep->gap_bl_hi_id == 0) { + /* + * It takes a while to create the blob update message, so skip + * the first time it asks. + */ + if (rep->blob_rereq == 0) { + rep->blob_rereq = 1; + goto err; + } + rep->blob_rereq = 0; + if ((ret = __db_truncate( + db_rep->blob_dbp, ip, NULL, &count)) != 0) + goto err; + rep->blob_more_files = 1; + rep->last_blob_id = rep->prev_blob_id; + rep->last_blob_sid = rep->prev_blob_sid; + } + + GET_CURINFO(rep, infop, rfp); + GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret); + if (ret != 0) + goto err; + /* + * If there are entries in the blob gap database, __rep_blobdone + * will perform gap processing, otherwise it will send + * a REP_BLOB_UPDATE_REQ. + */ + ret = __rep_blobdone(env, master, ip, rep, blob_fid, 1); + +err: + return (ret); +} + +/* * __rep_finfo_alloc - * Allocate and initialize a fileinfo structure. * @@ -3521,6 +5383,7 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg) { __rep_fileinfo_args *rfp, rf; __rep_fileinfo_v6_args *rfpv6; + __rep_fileinfo_v7_args *rfpv7; u_int8_t *next; int ret; void *rffree; @@ -3530,21 +5393,30 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg) rfpv6 = NULL; rffree = NULL; while (count-- > 0) { + /* + * Build a current struct by copying in the older + * version struct and then setting up the new fields. + * This is safe because all old fields are in the + * same location in the current struct. + */ if (version < DB_REPVERSION_53) { - /* - * Build a current struct by copying in the older - * version struct and then setting up the data_dir. - * This is safe because all old fields are in the - * same location in the current struct. - */ if ((ret = __rep_fileinfo_v6_unmarshal(env, version, &rfpv6, files, size, &next)) != 0) break; memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args)); rf.dir.data = NULL; rf.dir.size = 0; + rf.blob_fid_lo = rf.blob_fid_hi = 0; rfp = &rf; rffree = rfpv6; + } else if (version < DB_REPVERSION_61) { + if ((ret = __rep_fileinfo_v7_unmarshal(env, version, + &rfpv7, files, size, &next)) != 0) + break; + memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args)); + rf.blob_fid_lo = rf.blob_fid_hi = 0; + rfp = &rf; + rffree = rfpv7; } else { if ((ret = __rep_fileinfo_unmarshal(env, version, &rfp, files, size, &next)) != 0) @@ -3566,3 +5438,33 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg) __os_free(env, rffree); return (ret); } + +/* + * Initializes a FILE_LIST_CTX structure. + * + * Pass in a non-zero value for update_space to reserve space for + * update_args in the context's buffer. + */ +static int +__rep_init_file_list_context(env, version, flags, update_space, context) + ENV *env; + u_int32_t version; + u_int32_t flags; + int update_space; + FILE_LIST_CTX *context; +{ + int ret; + + if ((ret = __os_calloc(env, 1, MEGABYTE, &context->buf)) != 0) + return (ret); + context->size = MEGABYTE; + context->count = 0; + context->version = version; + context->flags = flags; + /* Reserve space for update_args. */ + if (update_space) + context->fillptr = FIRST_FILE_PTR(context->buf); + else + context->fillptr = context->buf; + return (ret); +} |
