summaryrefslogtreecommitdiff
path: root/src/rep/rep_backup.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rep/rep_backup.c')
-rw-r--r--src/rep/rep_backup.c2148
1 files changed, 2025 insertions, 123 deletions
diff --git a/src/rep/rep_backup.c b/src/rep/rep_backup.c
index cfde7622..14bc63bb 100644
--- a/src/rep/rep_backup.c
+++ b/src/rep/rep_backup.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2004, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -9,6 +9,7 @@
#include "db_config.h"
#include "db_int.h"
+#include "dbinc/blob.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/fop.h"
@@ -26,21 +27,45 @@
* Note that the fileinfo for the first file in the list always appears at
* (constant) offset __REP_UPDATE_SIZE in the buffer.
*/
+#define FILE_CTX_INMEM_ONLY 0x01
typedef struct {
u_int8_t *buf; /* Buffer base address. */
u_int32_t size; /* Total allocated buffer size. */
u_int8_t *fillptr; /* Pointer to first unused space. */
u_int32_t count; /* Number of entries currently in list. */
u_int32_t version; /* Rep version of marshaled format. */
+ u_int32_t flags; /* Context flags. */
} FILE_LIST_CTX;
#define FIRST_FILE_PTR(buf) ((buf) + __REP_UPDATE_SIZE)
/*
+ * Flags used to show the state of blob files on the master in messages
+ * sent to the client.
+ */
+#define BLOB_DONE 0x01
+#define BLOB_DELETE 0x02
+#define BLOB_CHUNK_FAIL 0x04
+
+#define BLOB_ID_SIZE sizeof(db_seq_t)
+#define BLOB_KEY_SIZE (2 * BLOB_ID_SIZE)
+
+/*
* Function that performs any desired processing on a single file, as part of
* the traversal of a list of database files, such as with internal init.
*/
typedef int (FILE_WALK_FN) __P((ENV *, __rep_fileinfo_args *, void *));
+static int __rep_add_files_to_list __P((
+ ENV *, const char *, const char *, FILE_LIST_CTX *, const char **, int));
+static int __rep_blob_chunk_gap
+ __P((ENV *, int, DB_THREAD_INFO *, REP *, int *, db_seq_t, int));
+static int __rep_blob_cleanup __P((ENV *, REP *));
+static int __rep_blobdone
+ __P((ENV *, int, DB_THREAD_INFO *, REP *, db_seq_t, int));
+static int __rep_blob_find_files __P((ENV *, DB_THREAD_INFO *, const char *,
+ db_seq_t *, db_seq_t, db_seq_t, db_seq_t *, DBT *, size_t *, u_int32_t *));
+static int __rep_blob_sort_dirs __P((ENV *,
+ int (*)(const char *), char **, int, char ***, int *));
static FILE_WALK_FN __rep_check_uid;
static int __rep_clean_interrupted __P((ENV *));
static FILE_WALK_FN __rep_cleanup_nimdbs;
@@ -52,6 +77,8 @@ static int __rep_get_fileinfo __P((ENV *, const char *,
const char *, __rep_fileinfo_args *, u_int8_t *));
static int __rep_get_file_list __P((ENV *,
DB_FH *, u_int32_t, u_int32_t *, DBT *));
+static int __rep_init_file_list_context __P((ENV *,
+ u_int32_t, u_int32_t, int, FILE_LIST_CTX *));
static int __rep_is_replicated_db __P((const char *, const char *));
static int __rep_log_setup __P((ENV *,
REP *, u_int32_t, u_int32_t, DB_LSN *));
@@ -72,9 +99,12 @@ static FILE_WALK_FN __rep_remove_file;
static int __rep_remove_logs __P((ENV *));
static int __rep_remove_nimdbs __P((ENV *));
static int __rep_rollback __P((ENV *, DB_LSN *));
+static int __rep_select_blob_file __P((const char *));
+static int __rep_select_blob_sdb __P((const char *));
static int __rep_unlink_by_list __P((ENV *, u_int32_t,
u_int8_t *, u_int32_t, u_int32_t));
static FILE_WALK_FN __rep_unlink_file;
+static int __rep_walk_blob_dir __P((ENV *, FILE_LIST_CTX*));
static int __rep_walk_filelist __P((ENV *, u_int32_t, u_int8_t *,
u_int32_t, u_int32_t, FILE_WALK_FN *, void *));
static int __rep_walk_dir __P((ENV *, const char *, const char *,
@@ -129,14 +159,12 @@ __rep_update_req(env, rp)
dblp = env->lg_handle;
logc = NULL;
- if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0)
- goto err_noalloc;
- context.size = MEGABYTE;
- context.count = 0;
- context.version = rp->rep_version;
/* Reserve space for the update_args, and fill in file info. */
- context.fillptr = FIRST_FILE_PTR(context.buf);
+ if ((ret = __rep_init_file_list_context(env, rp->rep_version,
+ F_ISSET(rp, REPCTL_INMEM_ONLY) ? FILE_CTX_INMEM_ONLY : 0,
+ 1, &context)) != 0)
+ goto err_noalloc;
if ((ret = __rep_find_dbs(env, &context)) != 0)
goto err;
@@ -214,6 +242,472 @@ err_noalloc:
}
/*
+ * Passed to the __rep_blob_sort_dirs function.
+ * Select blob files, of the form __db.bl###
+ */
+static int
+__rep_select_blob_file(file)
+ const char *file;
+{
+ if (strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) == 0)
+ return (1);
+ else
+ return (0);
+}
+
+/*
+ * Passed to the __rep_blob_sort_dirs function.
+ * Select blob subdatabase directories, of the form __db###
+ */
+static int
+__rep_select_blob_sdb(file)
+ const char *file;
+{
+ if (strncmp(BLOB_DIR_PREFIX, file, strlen(BLOB_DIR_PREFIX)) == 0 &&
+ strncmp(BLOB_FILE_PREFIX, file, strlen(BLOB_FILE_PREFIX)) != 0 &&
+ strcmp(BLOB_META_FILE_NAME, file) != 0)
+ return (1);
+ else
+ return (0);
+}
+
+/*
+ * __rep_blob_sort_dirs
+ * Create a sorted list of directory names that all share a type that
+ * is selected using the given function.
+ */
+static int
+__rep_blob_sort_dirs(env, select_fn, dirs, dirs_cnt, sorted, sorted_cnt)
+ ENV *env;
+ int (*select_fn) __P((const char *));
+ char **dirs;
+ int dirs_cnt;
+ char ***sorted;
+ int *sorted_cnt;
+{
+ char **sort, *tmp;
+ int i, ret, size, sort_cnt, swapped;
+
+ *sorted = NULL;
+ *sorted_cnt = 0;
+ sort_cnt = 0;
+
+ if ((ret = __os_malloc(env,
+ (sizeof(char *) * (unsigned int)dirs_cnt), &sort)) != 0)
+ return (ret);
+
+ for (i = 0; i < dirs_cnt; i++) {
+ if (select_fn(dirs[i])) {
+ sort[sort_cnt] = dirs[i];
+ sort_cnt++;
+ }
+ }
+
+ /*
+ * Directories are usually returned in order, or close to it, so use
+ * Bubble Sort to sort the list.
+ */
+ size = sort_cnt;
+ swapped = 1;
+ while (swapped == 1 && size > 1) {
+ swapped = 0;
+ for (i = 0; (i + 1) < size; i++) {
+ if (strcmp(sort[i], sort[i+1]) > 0) {
+ tmp = sort[i];
+ sort[i] = sort[i+1];
+ sort[i+1] = tmp;
+ swapped = 1;
+ }
+ }
+ size--;
+ }
+
+ *sorted = sort;
+ *sorted_cnt = sort_cnt;
+
+ return (0);
+}
+
+#define BLOB_THROTTLE_DEFAULT (10 * MEGABYTE)
+
+/*
+ * __rep_blob_update_req
+ * Send a list of blob files, starting after the blob id and sub-database
+ * id sent in the BLOB_UPDATE_REQ message.
+ *
+ * PUBLIC: int __rep_blob_update_req __P((ENV *, DB_THREAD_INFO *, DBT *));
+ */
+int
+__rep_blob_update_req(env, ip, rec)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ DBT *rec;
+{
+ DBT rbudbt;
+ REP *rep;
+ __rep_blob_update_args rbu;
+ __rep_blob_update_req_args rbur;
+ db_seq_t blob_fid, blob_id, blob_sdb, tmp;
+ int cur, dirs_cnt, ret, sdb_cnt;
+ size_t sent;
+ char *blob_sub_dir, *dir, **dirs, **sdb;
+ u_int32_t num_blobs, throttle;
+ u_int8_t *ptr;
+
+ memset(&rbu, 0, sizeof(__rep_blob_update_args));
+ memset(&rbudbt, 0, sizeof(DBT));
+ blob_sub_dir = dir = NULL;
+ dirs = sdb = NULL;
+ sent = 0;
+ num_blobs = 0;
+ cur = dirs_cnt = sdb_cnt = 0;
+ rep = env->rep_handle->region;
+ throttle = rep->gbytes * GIGABYTE + rep->bytes;
+ if (throttle == 0)
+ throttle = BLOB_THROTTLE_DEFAULT;
+
+ if ((ret = __rep_blob_update_req_unmarshal(
+ env, &rbur, rec->data, rec->size, &ptr)) != 0)
+ goto err;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"blob_update_req: file_id %llu sdb_id %llu blob_id %llu highest %llu",
+ (long long)rbur.blob_fid, (long long)rbur.blob_sid,
+ (long long)rbur.blob_id, (long long)rbur.highest_id));
+
+ rbu.blob_fid = rbur.blob_fid;
+
+ if ((ret = __os_malloc(env, MEGABYTE, &rbudbt.data)) != 0)
+ goto err;
+ rbudbt.ulen = MEGABYTE;
+ rbudbt.size = __REP_BLOB_UPDATE_SIZE;
+
+ blob_fid = (db_seq_t)rbur.blob_fid;
+ blob_sdb = (db_seq_t)rbur.blob_sid;
+ blob_id = (db_seq_t)rbur.blob_id;
+
+ /* Find the first blob file if it is unknown. */
+ if (blob_id == 0 && blob_sdb == 0) {
+find_sdb: if (dirs == NULL) {
+ if ((ret = __blob_make_sub_dir(
+ env, &blob_sub_dir, blob_fid, 0)) != 0)
+ goto err;
+ if ((ret = __db_appname(
+ env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0)
+ goto err;
+ /* If no directory, there are no blobs to send. */
+ if (__os_exists(env, dir, NULL) != 0)
+ goto filedone;
+
+ if ((ret = __os_dirlist(
+ env, dir, 1, &dirs, &dirs_cnt)) != 0)
+ goto err;
+
+ if (dirs_cnt == 0)
+ goto filedone;
+
+ if ((ret = __rep_blob_sort_dirs(
+ env, __rep_select_blob_sdb,
+ dirs, dirs_cnt, &sdb, &sdb_cnt)) != 0)
+ goto err;
+ }
+ /*
+ * Iterate through the list of subdirectories, until we find
+ * one that has an id larger than the current subdirectory id.
+ */
+ while (cur < sdb_cnt) {
+ if ((ret = __blob_path_to_dir_ids(
+ env, sdb[cur], &tmp, NULL)) != 0)
+ goto err;
+ if (blob_sdb < tmp) {
+ blob_sdb = tmp;
+ break;
+ }
+ cur++;
+ }
+ /* Check if no more subdirectories to search */
+ if (sdb_cnt != 0 && cur == sdb_cnt)
+ goto filedone;
+ if (dir != NULL)
+ __os_free(env, dir);
+ dir = NULL;
+ if (blob_sub_dir != NULL)
+ __os_free(env, blob_sub_dir);
+ blob_sub_dir = NULL;
+ }
+
+ if (blob_sub_dir == NULL && (ret =
+ __blob_make_sub_dir(env, &blob_sub_dir, blob_fid, blob_sdb)) != 0)
+ goto err;
+
+ if (dir == NULL && (ret = __db_appname(
+ env, DB_APP_BLOB, blob_sub_dir, NULL, &dir)) != 0)
+ goto err;
+ /* Search the current directory for blob files with id > blob_id. */
+ if ((ret = __rep_blob_find_files(
+ env, ip, dir, &blob_id, blob_sdb, blob_fid,
+ (db_seq_t *)&rbur.highest_id, &rbudbt, &sent, &num_blobs)) != 0)
+ goto err;
+
+ /*
+ * If we have not reached the send limit, and there are still
+ * directories to search, then search the next directory.
+ */
+ if (sent < throttle) {
+ if (blob_sdb != 0) {
+ rbur.highest_id = 0;
+ blob_id = 0;
+ __os_free(env, blob_sub_dir);
+ blob_sub_dir = NULL;
+ __os_free(env, dir);
+ dir = NULL;
+ goto find_sdb;
+ } else {
+ /* Mark as the end of the files. */
+filedone: F_SET(&rbu, BLOB_DONE);
+ rbur.highest_id = 0;
+ }
+ } else
+ STAT(rep->stat.st_nthrottles++);
+
+ rbu.num_blobs = num_blobs;
+ rbu.highest_id = rbur.highest_id;
+ __rep_blob_update_marshal(env, &rbu, rbudbt.data);
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "Sending blob_update: file_id %llu, num_blobs %lu, flags %lu",
+ (long long)rbu.blob_fid,
+ (long)num_blobs, (unsigned long)rbu.flags));
+ (void)__rep_send_message(
+ env, DB_EID_BROADCAST, REP_BLOB_UPDATE, NULL, &rbudbt, 0, 0);
+
+err: if (sdb != NULL)
+ __os_free(env, sdb);
+ if (dirs != NULL)
+ __os_dirfree(env, dirs, dirs_cnt);
+ if (dir != NULL)
+ __os_free(env, dir);
+ if (blob_sub_dir != NULL)
+ __os_free(env, blob_sub_dir);
+ if (rbudbt.data != NULL)
+ __os_free(env, rbudbt.data);
+ return (ret);
+}
+
+/*
+ * __rep_blob_find_files
+ *
+ * Search a directory for blob files, starting with the given blob id and
+ * sub-database id. Add information for each blob to the message buffer until
+ * there are no more files, or it has reached the maximum send amount in terms
+ * of combined blob files size.
+ *
+ * This search is complicated because the blobs have to be sent in order by id,
+ * but there can be huge holes between a blob file and the one with the next
+ * highest id, so iterating through the ids looking to see if the file exists
+ * for each id will take too long. The solution is to walk the directory
+ * hierarchy in order, reading every file in that directory, sorting them by
+ * id, and adding them to the update list.
+ */
+static int
+__rep_blob_find_files(
+ env, ip, dir, blob_id, blob_sid, blob_fid, highest, buf, sent, num)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ const char *dir;
+ db_seq_t *blob_id;
+ db_seq_t blob_sid;
+ db_seq_t blob_fid;
+ db_seq_t *highest;
+ DBT *buf;
+ size_t *sent;
+ u_int32_t *num;
+{
+ DB *bmd;
+ DB_FH *fhp;
+ DB_TXN *txn;
+ REP *rep;
+ __rep_blob_file_args rbf;
+ char blob_path[MAX_BLOB_PATH_SZ], **dirs, **files, *path, *ptr;
+ db_seq_t tmp;
+ int blob_path_len, cur, depth, dirs_cnt, files_cnt, ret;
+ off_t blob_size;
+ size_t len;
+ u_int32_t bytes, mbytes, throttle;
+
+ bmd = NULL;
+ txn = NULL;
+ fhp = NULL;
+ path = NULL;
+ dirs = files = NULL;
+ dirs_cnt = files_cnt = 0;
+ rbf.blob_sid = (u_int64_t)blob_sid;
+ rep = env->rep_handle->region;
+ throttle = rep->gbytes * GIGABYTE + rep->bytes;
+ if (throttle == 0)
+ throttle = BLOB_THROTTLE_DEFAULT;
+
+ if ((ret = __os_malloc(
+ env, strlen(dir) + MAX_BLOB_PATH_SZ, &path)) != 0)
+ goto err;
+
+ /*
+ * Read the highest possible blob id from the blob meta database, so
+ * we know when to stop looking for files for this database. The
+ * highest value is reset everytime we switch to a new subdatabase.
+ */
+ if (*highest == 0) {
+ if ((ret = __db_create_internal(&bmd, env, 0)) != 0)
+ goto err;
+
+ if ((ret = __txn_begin(
+ env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
+ goto err;
+
+ bmd->blob_file_id = blob_fid;
+ bmd->blob_sdb_id = blob_sid;
+ if ((ret = __blob_highest_id(bmd, txn, highest) ) != 0)
+ goto err;
+
+ if ((ret = __txn_abort(txn)) != 0)
+ goto err;
+ txn = NULL;
+ if ((ret = __db_close(bmd, NULL, 0)) != 0)
+ goto err;
+ bmd = NULL;
+ (*highest)++;
+ }
+
+ (*blob_id)++;
+ while (*sent < throttle && *blob_id < *highest) {
+ memset(blob_path, 0, MAX_BLOB_PATH_SZ);
+ blob_path_len = depth = 0;
+
+ /* Calucate the subdirectory from the blob id. */
+ __blob_calculate_dirs(
+ *blob_id, blob_path, &blob_path_len, &depth);
+ if (blob_path_len != 0) {
+ (void)sprintf(path, "%s%c%s%c",
+ dir, PATH_SEPARATOR[0], blob_path, PATH_SEPARATOR[0]);
+ } else
+ (void)sprintf(path, "%s", dir);
+ len = strlen(path);
+
+ /* If the sub-directory does not exist, look for the next. */
+ if (__os_exists(env, path, NULL) != 0) {
+ (*blob_id) +=
+ BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS);
+ continue;
+ }
+
+ /* Get a list of all the blob files, sorted by id. */
+ if ((ret = __os_dirlist(env, path, 0, &dirs, &dirs_cnt)) != 0)
+ goto err;
+
+ if ((ret = __rep_blob_sort_dirs(env, __rep_select_blob_file,
+ dirs, dirs_cnt, &files, &files_cnt)) != 0)
+ goto err;
+
+ /*
+ * Find the first blob file with an id greater than or equal to
+ * the last id.
+ */
+ for (cur = 0; cur < files_cnt; cur++) {
+ ptr = files[cur];
+ ptr += strlen(BLOB_FILE_PREFIX);
+ if ((ret = __blob_str_to_id(
+ env, (const char **)&ptr, &tmp)) != 0)
+ goto err;
+ DB_ASSERT(env, tmp != 0);
+ if (tmp >= *blob_id)
+ break;
+ }
+
+ /* Add each remaining blob file to the message buffer. */
+ while (cur < files_cnt) {
+ /* Get the blob id from the current file name. */
+ (void)sprintf(path + len, "%s", files[cur]);
+ ptr = path + len + strlen(BLOB_FILE_PREFIX);
+ if ((ret = __blob_str_to_id(
+ env, (const char **)&ptr, blob_id)) != 0)
+ goto err;
+ rbf.blob_id = (u_int64_t)*blob_id;
+ /* Open the file and get its size. */
+ if ((ret = __os_open(
+ env, path, 0, DB_OSO_RDONLY, 0, &fhp)) != 0) {
+ if (ret == ENOENT) {
+ ret = 0;
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_update blob file: %llu deleted, skipping.",
+ (long long)rbf.blob_id));
+ cur++;
+ continue;
+ }
+ goto err;
+ }
+ if ((ret = __os_ioinfo(
+ env, path, fhp, &mbytes, &bytes, NULL)) != 0)
+ goto err;
+ if ((ret =__os_closehandle(env, fhp)) != 0)
+ goto err;
+ fhp = NULL;
+ blob_size = ((off_t)mbytes * (off_t)MEGABYTE) + bytes;
+ rbf.blob_size = (u_int64_t)blob_size;
+ if (blob_size > UINT32_MAX)
+ (*sent) = throttle + 1;
+ else {
+ if (((*sent) + (size_t)blob_size) < (*sent))
+ (*sent) = throttle + 1;
+ else
+ (*sent) += (size_t)blob_size;
+ }
+ __rep_blob_file_marshal(
+ env, &rbf, (u_int8_t *)buf->data + buf->size);
+ (*num)++;
+ buf->size += __REP_BLOB_FILE_SIZE;
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_update adding: blob_sid %llu, blob_id %llu blob_size %llu",
+ (long long)rbf.blob_sid,
+ (long long)rbf.blob_id, (long long)rbf.blob_size));
+ if ((*sent) > throttle)
+ goto err;
+
+ /* Resize if there is not enough space to grow. */
+ if (buf->size > (buf->ulen - __REP_BLOB_FILE_SIZE)) {
+ if ((ret = __os_realloc(
+ env, buf->ulen * 2, &buf->data)) != 0)
+ goto err;
+ buf->ulen *= 2;
+ }
+ cur++;
+ }
+ /*
+ * Move to the next directory of blob files by setting the blob
+ * id to the next largest possible value.
+ */
+ (*blob_id) += BLOB_DIR_ELEMS - (*blob_id % BLOB_DIR_ELEMS);
+ __os_free(env, files);
+ files = NULL;
+ __os_dirfree(env, dirs, dirs_cnt);
+ dirs = NULL;
+ }
+err:
+ if (path != NULL)
+ __os_free(env, path);
+ if (files != NULL)
+ __os_free(env, files);
+ if (dirs != NULL)
+ __os_dirfree(env, dirs, dirs_cnt);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (txn != NULL)
+ (void)__txn_abort(txn);
+ if (bmd != NULL)
+ (void)__db_close(bmd, NULL, 0);
+
+ return (ret);
+}
+
+/*
* __rep_find_dbs -
* Walk through all the named files/databases including those in the
* environment or data_dirs and those that in named and in-memory. We
@@ -240,7 +734,8 @@ __rep_find_dbs(env, context)
* replicated user databases. If the application has a metadata_dir,
* this will also find any persistent internal system databases.
*/
- if (dbenv->db_data_dir != NULL) {
+ if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) &&
+ dbenv->db_data_dir != NULL) {
for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) {
if ((ret = __db_appname(env,
DB_APP_NONE, *ddir, NULL, &real_dir)) != 0)
@@ -252,16 +747,24 @@ __rep_find_dbs(env, context)
real_dir = NULL;
}
}
+
/*
* Walk the environment directory. If the application doesn't
* have a metadata_dir, this will return persistent internal system
* databases. If the application doesn't have a separate data
* directory, this will also return all user databases.
*/
- if (ret == 0)
+ if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0)
ret = __rep_walk_dir(env, env->db_home, NULL, context);
- /* Now, collect any in-memory named databases. */
+ /* Gather the databases in the blob directory. */
+ if (!F_ISSET(context, FILE_CTX_INMEM_ONLY) && ret == 0)
+ ret = __rep_walk_blob_dir(env, context);
+
+ /*
+ * Now, collect any in-memory named databases. We do this no
+ * matter if the INMEM_ONLY flag is set or not.
+ */
if (ret == 0)
ret = __rep_walk_dir(env, NULL, NULL, context);
@@ -271,6 +774,148 @@ __rep_find_dbs(env, context)
}
/*
+ * __rep_walk_blob_dir --
+ *
+ * The blob directory hierarchy consists of a top layer that contains the
+ * blob meta database (BMD) and a set of blob directories (BLDIR).
+ * Each BLDIR corresponds to a database file. If the database file doesn't
+ * contain subdatabases, the BLDIR contains a BMD and blob files. If the
+ * database file contains subdatabases, the BLDIR contains a BLSDIR
+ * subdirectory for each subdatabase. Each BLSDIR contains a BMD and blob
+ * files.
+ *
+ * This function walks the blob directory hierarchy and records any BMD.
+ * It first checks if the top level BMD exists, and if it does searches
+ * the first and second layers of the hierarchy for BMDs.
+ */
+static int
+__rep_walk_blob_dir(env, context)
+ ENV *env;
+ FILE_LIST_CTX *context;
+{
+ int cnt, cnt2, i, j, ret;
+ size_t len;
+ char *blob_dir, *blob_sub, **dirs, *name, *name2, **subdirs;
+ char blob_sub_buf[MAX_BLOB_PATH_SZ];
+ const char *bmd, *dirp;
+
+ cnt = cnt2 = 0;
+ blob_dir = name = name2 = NULL;
+ dirs = subdirs = NULL;
+ bmd = BLOB_META_FILE_NAME;
+ blob_sub = blob_sub_buf;
+
+ if ((ret = __db_appname(
+ env, DB_APP_BLOB, BLOB_META_FILE_NAME, &dirp, &name)) != 0)
+ goto err;
+
+ /*
+ * If the main blob meta database does not exist, then no databases in
+ * the environment supports blobs.
+ */
+ if ((ret = __os_exists(env, name, NULL)) != 0) {
+ ret = 0;
+ goto err;
+ }
+
+ /* Get the blob directory. */
+ if ((ret = __db_appname(
+ env, DB_APP_BLOB, NULL, &dirp, &blob_dir)) != 0)
+ goto err;
+
+ if ((ret = __rep_add_files_to_list(
+ env, blob_dir, NULL, context, &bmd, 1)) != 0)
+ goto err;
+
+ if ((ret = __os_dirlist(env, blob_dir, 1, &dirs, &cnt)) != 0)
+ goto err;
+
+ __os_free(env, name);
+ name = NULL;
+ if ((ret = __os_malloc(
+ env, MAX_BLOB_PATH_SZ + strlen(blob_dir), &name)) != 0)
+ goto err;
+
+ for (i = 0; i < cnt; i++) {
+ /*
+ * Skip blob files and the top level BMD
+ * (which was handled above).
+ */
+ if (IS_BLOB_META(dirs[i]) || IS_BLOB_FILE(dirs[i]))
+ continue;
+ len = strlen(blob_dir) +
+ strlen(dirs[i]) + strlen(BLOB_META_FILE_NAME) + 3;
+ (void)snprintf(name, len, "%s%c%s%c%s", blob_dir,
+ PATH_SEPARATOR[0], dirs[i], PATH_SEPARATOR[0],
+ BLOB_META_FILE_NAME);
+ /*
+ * If a blob meta database exists, add it to the list, and move
+ * on to the next directory, otherwise get a directory list and
+ * check the second layer for BMD. If a directory contains a
+ * BMD, then it cannot contain subdirectories with BMD.
+ */
+ if (__os_exists(env, name, NULL) == 0) {
+ (void)snprintf(blob_sub,
+ strlen(dirs[i]) + strlen(bmd) + 2,
+ "%s%c%s", dirs[i], PATH_SEPARATOR[0], bmd);
+ if ((ret = __rep_add_files_to_list(env, blob_dir,
+ NULL, context, (const char **)&blob_sub, 1)) != 0)
+ goto err;
+ } else {
+ len = strlen(blob_dir) + strlen(dirs[i]) + 2;
+ (void)snprintf(name, len, "%s%c%s",
+ blob_dir, PATH_SEPARATOR[0], dirs[i]);
+ if ((ret = __os_dirlist(
+ env, name, 1, &subdirs, &cnt2)) != 0)
+ goto err;
+ if (name2 == NULL) {
+ if ((ret = __os_malloc(env,
+ MAX_BLOB_PATH_SZ + strlen(name),
+ &name2)) != 0)
+ goto err;
+ }
+ for (j = 0; j < cnt2; j++) {
+ if (IS_BLOB_FILE(subdirs[j]))
+ continue;
+ len = strlen(name) + strlen(subdirs[j])
+ + strlen(BLOB_META_FILE_NAME) + 3;
+ (void)snprintf(name2, len, "%s%c%s%c%s",
+ name, PATH_SEPARATOR[0], subdirs[j],
+ PATH_SEPARATOR[0], BLOB_META_FILE_NAME);
+ if ((ret = __os_exists(
+ env, name2, NULL)) == 0) {
+ len = strlen(dirs[i])
+ + strlen(subdirs[j])
+ + strlen(bmd) + 3;
+ (void)snprintf(blob_sub,
+ len, "%s%c%s%c%s", dirs[i],
+ PATH_SEPARATOR[0], subdirs[j],
+ PATH_SEPARATOR[0], bmd);
+ if ((ret = __rep_add_files_to_list(
+ env, blob_dir, NULL, context,
+ (const char **)&blob_sub, 1)) != 0)
+ goto err;
+ }
+ }
+ __os_dirfree(env, subdirs, cnt2);
+ subdirs = NULL;
+ }
+ }
+
+err: if (name != NULL)
+ __os_free(env, name);
+ if (name2 != NULL)
+ __os_free(env, name2);
+ if (blob_dir != NULL)
+ __os_free(env, blob_dir);
+ if (dirs != NULL)
+ __os_dirfree(env, dirs, cnt);
+ if (subdirs != NULL)
+ __os_dirfree(env, subdirs, cnt2);
+ return (ret);
+}
+
+/*
* __rep_walk_dir --
*
* This is the routine that walks a directory and fills in the structures
@@ -284,11 +929,8 @@ __rep_walk_dir(env, dir, datadir, context)
const char *dir, *datadir;
FILE_LIST_CTX *context;
{
- __rep_fileinfo_args tmpfp;
- size_t avail, len;
- int cnt, first_file, i, ret;
- u_int8_t uid[DB_FILE_ID_LEN];
- char *file, **names, *subdb;
+ int cnt, ret;
+ char **names;
if (dir == NULL) {
VPRINT(env, (env, DB_VERB_REP_SYNC,
@@ -304,7 +946,34 @@ __rep_walk_dir(env, dir, datadir, context)
}
VPRINT(env, (env, DB_VERB_REP_SYNC, "Walk_dir: Dir %s has %d files",
(dir == NULL) ? "INMEM" : dir, cnt));
+ ret = __rep_add_files_to_list(
+ env, dir, datadir, context, (const char **)names, cnt);
+
+ __os_dirfree(env, names, cnt);
+ return (ret);
+}
+
+/*
+ * __rep_add_files_to_list --
+ *
+ * Add the given files to the file list.
+ */
+static int
+__rep_add_files_to_list(env, dir, datadir, context, names, cnt)
+ ENV *env;
+ const char *dir, *datadir;
+ FILE_LIST_CTX *context;
+ const char **names;
+ int cnt;
+{
+ __rep_fileinfo_args tmpfp;
+ size_t avail, len;
+ int first_file, i, ret;
+ u_int8_t uid[DB_FILE_ID_LEN];
+ const char *file, *subdb;
+
first_file = 1;
+ ret = 0;
for (i = 0; i < cnt; i++) {
VPRINT(env, (env, DB_VERB_REP_SYNC,
"Walk_dir: File %d name: %s", i, names[i]));
@@ -372,15 +1041,19 @@ __rep_walk_dir(env, dir, datadir, context)
DB_SET_DBT(tmpfp.uid, uid, DB_FILE_ID_LEN);
retry: avail = (size_t)(&context->buf[context->size] -
context->fillptr);
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
+ */
if (context->version < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env, context->version,
(__rep_fileinfo_v6_args *)&tmpfp,
context->fillptr, avail, &len);
+ else if (context->version < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env, context->version,
+ (__rep_fileinfo_v7_args *)&tmpfp,
+ context->fillptr, avail, &len);
else
ret = __rep_fileinfo_marshal(env, context->version,
&tmpfp, context->fillptr, avail, &len);
@@ -409,9 +1082,7 @@ retry: avail = (size_t)(&context->buf[context->size] -
*/
context->fillptr += len;
}
-err:
- __os_dirfree(env, names, cnt);
- return (ret);
+err: return (ret);
}
/*
@@ -430,7 +1101,7 @@ __rep_is_replicated_db(name, dir)
/*
* Remaining things that don't have a "__db" prefix are eligible.
*/
- if (!IS_DB_FILE(name))
+ if (!IS_DB_FILE(name) || IS_BLOB_META(name))
return (1);
/* Here, we know we have a "__db" name. */
@@ -470,7 +1141,7 @@ __rep_check_uid(env, rfp, uid)
if (memcmp(rfp->uid.data, uid, DB_FILE_ID_LEN) == 0) {
VPRINT(env, (env, DB_VERB_REP_SYNC,
"Check_uid: Found matching file."));
- ret = DB_KEYEXIST;
+ ret = USR_ERR(env, DB_KEYEXIST);
}
return (ret);
@@ -489,6 +1160,7 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid)
DB_THREAD_INFO *ip;
PAGE *pagep;
int lorder, ret, t_ret;
+ u_int32_t flags;
dbp = NULL;
dbc = NULL;
@@ -503,11 +1175,15 @@ __rep_get_fileinfo(env, file, subdb, rfp, uid)
* database handles would block the master from handling UPDATE_REQ.
*/
F_SET(dbp, DB_AM_RECOVER);
- if ((ret = __db_open(dbp, ip, NULL, file, subdb, DB_UNKNOWN,
- DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0),
- 0, PGNO_BASE_MD)) != 0)
+ flags = DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0);
+ if (file != NULL && IS_BLOB_META(file))
+ LF_SET(DB_INTERNAL_BLOB_DB);
+ if ((ret = __db_open(dbp, ip, NULL,
+ file, subdb, DB_UNKNOWN, flags, 0, PGNO_BASE_MD)) != 0)
goto err;
+ SET_LO_HI_VAR(dbp->blob_file_id, rfp->blob_fid_lo, rfp->blob_fid_hi);
+
if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
goto err;
if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, ip, dbc->txn,
@@ -574,6 +1250,7 @@ __rep_page_req(env, ip, eid, rp, rec)
{
__rep_fileinfo_args *msgfp, msgf;
__rep_fileinfo_v6_args *msgfpv6;
+ __rep_fileinfo_v7_args *msgfpv7;
DB_MPOOLFILE *mpf;
DB_REP *db_rep;
REP *rep;
@@ -584,21 +1261,30 @@ __rep_page_req(env, ip, eid, rp, rec)
db_rep = env->rep_handle;
rep = db_rep->region;
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (rp->rep_version < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version,
&msgfpv6, rec->data, rec->size, &next)) != 0)
return (ret);
memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args));
msgf.dir.data = NULL;
msgf.dir.size = 0;
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
msgfp = &msgf;
msgfree = msgfpv6;
+ } else if (rp->rep_version < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version,
+ &msgfpv7, rec->data, rec->size, &next)) != 0)
+ return (ret);
+ memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args));
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
+ msgfp = &msgf;
+ msgfree = msgfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
&msgfp, rec->data, rec->size, &next)) != 0)
@@ -624,7 +1310,7 @@ __rep_page_req(env, ip, eid, rp, rec)
(void)__rep_send_message(env, eid, REP_FILE_FAIL,
NULL, rec, 0, 0);
else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
}
@@ -738,7 +1424,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
#ifdef HAVE_QUEUE
if ((ret = __qam_fget(qdbc, &p, 0, &pagep)) == ENOENT)
#endif
- ret = DB_PAGE_NOTFOUND;
+ ret = USR_ERR(env, DB_PAGE_NOTFOUND);
} else
ret = __memp_fget(mpf, &p, ip, NULL, 0, &pagep);
msgfp->pgno = p;
@@ -748,16 +1434,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"sendpages: PAGE_FAIL on page %lu",
(u_long)p));
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
+ */
if (rp->rep_version < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env,
rp->rep_version,
(__rep_fileinfo_v6_args *)msgfp,
buf, msgsz, &len);
+ else if (rp->rep_version < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env,
+ rp->rep_version,
+ (__rep_fileinfo_v7_args *)msgfp,
+ buf, msgsz, &len);
else
ret = __rep_fileinfo_marshal(env,
rp->rep_version, msgfp, buf,
@@ -772,7 +1463,7 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0);
continue;
} else
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
} else if (ret != 0)
goto err;
@@ -796,16 +1487,21 @@ __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"sendpages: %lu, page lsn [%lu][%lu]", (u_long)p,
(u_long)pagep->lsn.file, (u_long)pagep->lsn.offset));
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * structs matches the old struct.
+ */
if (rp->rep_version < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env,
rp->rep_version,
(__rep_fileinfo_v6_args *)msgfp,
buf, msgsz, &len);
+ else if (rp->rep_version < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env,
+ rp->rep_version,
+ (__rep_fileinfo_v7_args *)msgfp,
+ buf, msgsz, &len);
else
ret = __rep_fileinfo_marshal(env, rp->rep_version,
msgfp, buf, msgsz, &len);
@@ -1010,7 +1706,8 @@ __rep_update_setup(env, eid, rp, rec, savetime, lsn)
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
ZERO_LSN(lp->max_perm_lsn);
- if (db_rep->rep_db == NULL)
+ ret = __rep_blob_cleanup(env, rep);
+ if (ret == 0 && db_rep->rep_db == NULL)
ret = __rep_client_dbinit(env, 0, REP_DB);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (ret != 0)
@@ -1148,6 +1845,337 @@ err: /*
return (ret);
}
+/*
+ * __rep_blob_update
+ * Prepare to receive blob file data by setting up the blob gap database,
+ * then requesting the blob file data.
+ *
+ * PUBLIC: int __rep_blob_update __P((ENV *, int, DB_THREAD_INFO *, DBT *));
+ */
+int
+__rep_blob_update(env, eid, ip, rec)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ DBT *rec;
+{
+ DBC *dbc;
+ DB_REP *db_rep;
+ DBT data, key;
+ REP *rep;
+ REGINFO *infop;
+ __rep_blob_file_args rbf;
+ __rep_blob_update_args rbu;
+ __rep_fileinfo_args *rfp;
+ db_seq_t blob_fid;
+ int ret;
+ off_t offset;
+ size_t len;
+ u_int32_t num_blobs;
+ u_int8_t keybuf[BLOB_KEY_SIZE], *ptr;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ infop = env->reginfo;
+ rfp = NULL;
+ dbc = NULL;
+ memset(&rbu, 0, sizeof(__rep_blob_update_args));
+ memset(&rbf, 0, sizeof(__rep_blob_file_args));
+
+ if ((ret = __rep_blob_update_unmarshal(
+ env, &rbu, rec->data, rec->size, &ptr)) != 0)
+ return (ret);
+ len = rec->size - __REP_BLOB_UPDATE_SIZE;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"blob_update: file_id %llu, num_blobs %lu, flags %lu, highest %llu",
+ (long long)rbu.blob_fid, (long)rbu.num_blobs,
+ (unsigned long)rbu.flags, (long long)rbu.highest_id));
+
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+
+ /*
+ * Check if the world changed.
+ */
+ if (rep->sync_state != SYNC_PAGE)
+ goto unlock;
+
+ /* Make sure this is for the current database. */
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret);
+ if (ret != 0)
+ goto unlock;
+
+ if (blob_fid != (db_seq_t)rbu.blob_fid)
+ goto unlock;
+
+ rep->highest_id = (db_seq_t)rbu.highest_id;
+ /*
+ * For each blob file, add an entry to the database for each 1 MB
+ * section of that file. The entries will be deleted as the
+ * coresponding blob chunks arrive and are written to disk.
+ */
+ if (db_rep->blob_dbp == NULL &&
+ (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0)
+ goto unlock;
+
+ if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0)
+ goto unlock;
+
+ /*
+ * Make sure no one else has populated the database, this could happen
+ * if the update message is sent twice.
+ */
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ if ((ret = __dbc_get(dbc, &key, &data, DB_FIRST)) != DB_NOTFOUND)
+ goto unlock;
+
+ /* It is possible for a blob database to have no blobs. */
+ if (rbu.num_blobs == 0) {
+ (void)__dbc_close(dbc);
+ dbc = NULL;
+ rep->blob_more_files = 0;
+ rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0;
+ rep->last_blob_id = rep->last_blob_sid = 0;
+ rep->prev_blob_id = rep->prev_blob_sid = 0;
+ rep->gap_bl_hi_off = 0;
+ rep->blob_sync = 0;
+ rep->highest_id = 0;
+ rep->blob_rereq = 0;
+ ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0);
+ goto unlock;
+ }
+
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ data.flags = key.flags = DB_DBT_USERMEM;
+ key.data = keybuf;
+ key.ulen = key.size = BLOB_KEY_SIZE;
+ data.data = (void *)&offset;
+ data.ulen = data.size = sizeof(offset);
+ num_blobs = 0;
+ while (num_blobs < rbu.num_blobs) {
+ if ((ret =
+ __rep_blob_file_unmarshal(env, &rbf, ptr, len, &ptr)) != 0)
+ goto unlock;
+ len -= __REP_BLOB_FILE_SIZE;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_update adding file: blob_id %llu, sdb_id %llu, blob_size %llu",
+ (long long)rbf.blob_id, (long long)rbf.blob_sid,
+ (long long)rbf.blob_size));
+
+ memcpy(keybuf, &rbf.blob_sid, BLOB_ID_SIZE);
+ memcpy(&(keybuf[BLOB_ID_SIZE]), &rbf.blob_id, BLOB_ID_SIZE);
+ offset = 0;
+ /*
+ * Add an entry for each megabyte of the blob file. Zero
+ * length blob files should have at least one entry.
+ */
+ do {
+ if ((ret = __dbc_put(dbc, &key, &data, 0)) != 0)
+ goto unlock;
+ offset += MEGABYTE;
+ /*
+ * Check for overflow, this can happen when the master
+ * supports 64 file offsets, but the client does not.
+ */
+ if (offset < 0) {
+ __db_errx(env,
+ DB_STR("3704",
+ "Blob file offset overflow"));
+ ret = EINVAL;
+ goto unlock;
+ }
+ } while ((u_int32_t)offset < rbf.blob_size);
+ num_blobs++;
+ }
+ /* Set whether there are more files after the ones on the list. */
+ if (F_ISSET(&rbu, BLOB_DONE))
+ rep->blob_more_files = 0;
+ else
+ rep->blob_more_files = 1;
+ rep->prev_blob_id = rep->last_blob_id;
+ rep->prev_blob_sid = rep->last_blob_sid;
+ rep->last_blob_sid = (db_seq_t)rbf.blob_sid;
+ rep->last_blob_id = (db_seq_t)rbf.blob_id;
+
+ /*
+ * Send the same message payload in a REP_BLOB_ALL_REQ message to get
+ * the blob data. Peer-to-peer initialization is not supported for
+ * blobs, so we can only send this back to the master despite the fact
+ * that building the list of blob files is expensive.
+ */
+ (void)__rep_send_message(
+ env, rep->master_id, REP_BLOB_ALL_REQ, NULL, rec, 0, 0);
+
+unlock: REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ if (dbc != NULL)
+ (void)__dbc_close(dbc);
+
+ return (ret);
+}
+
+/*
+ * __rep_blob_allreq
+ * Request blob file data.
+ *
+ * PUBLIC: int __rep_blob_allreq __P((ENV *, int, DBT *));
+ */
+int
+__rep_blob_allreq(env, eid, rec)
+ ENV *env;
+ int eid;
+ DBT *rec;
+{
+ DB *dbp;
+ DB_FH *fhp;
+ DBT msg;
+ __rep_blob_chunk_args rbc;
+ __rep_blob_file_args rbf;
+ __rep_blob_update_args rbu;
+ db_seq_t old_sdb_id;
+ int done, ret;
+ off_t offset;
+ size_t len;
+ u_int32_t num_blobs;
+ u_int8_t *chunk_buf, *msg_buf, *ptr;
+
+ dbp = NULL;
+ fhp = NULL;
+ chunk_buf = msg_buf = NULL;
+ memset(&rbu, 0, sizeof(__rep_blob_update_args));
+ memset(&rbc, 0, sizeof(__rep_blob_chunk_args));
+ memset(&msg, 0, sizeof(DBT));
+
+ if ((ret =
+ __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0)
+ goto err;
+ msg.data = msg_buf;
+ msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE;
+ if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0)
+ goto err;
+ rbc.data.data = chunk_buf;
+ rbc.data.ulen = MEGABYTE;
+ rbc.data.flags = DB_DBT_USERMEM;
+
+ /*
+ * The REP_BLOB_ALL_REQ message sends the REP_BLOB_UPDATE message
+ * payload back to the master to request the actual blobs after the
+ * client has prepared itself to receive them.
+ */
+ len = rec->size;
+ if ((ret = __rep_blob_update_unmarshal(
+ env, &rbu, rec->data, rec->size, &ptr)) != 0)
+ goto err;
+ len -= __REP_BLOB_UPDATE_SIZE;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_all_req: file_id %llu, num_blobs %lu, flags %lu",
+ (long long)rbu.blob_fid, (long)rbu.num_blobs,
+ (unsigned long)rbu.flags));
+
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto err;
+ dbp->blob_file_id = (db_seq_t)rbu.blob_fid;
+ rbc.blob_fid = rbu.blob_fid;
+ num_blobs = 0;
+ /*
+ * The list of files to send is included in the message, go
+ * through the list and send each file in pieces.
+ */
+ while (num_blobs < rbu.num_blobs) {
+ num_blobs++;
+ if ((ret = __rep_blob_file_unmarshal(
+ env, &rbf, ptr, len, &ptr)) != 0)
+ goto err;
+ len -= __REP_BLOB_FILE_SIZE;
+ old_sdb_id = dbp->blob_sdb_id;
+ dbp->blob_sdb_id = (db_seq_t)rbf.blob_sid;
+ rbc.flags = 0;
+ rbc.blob_sid = rbf.blob_sid;
+ rbc.blob_id = rbf.blob_id;
+ /* Free the sub-directory information if it has changed. */
+ if (old_sdb_id != dbp->blob_sdb_id &&
+ dbp->blob_sub_dir != NULL) {
+ __os_free(env, dbp->blob_sub_dir);
+ dbp->blob_sub_dir = NULL;
+ }
+ if (dbp->blob_sub_dir == NULL) {
+ if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir,
+ dbp->blob_file_id, dbp->blob_sdb_id)) != 0)
+ goto err;
+ }
+ if ((ret = __blob_file_open(dbp,
+ &fhp, (db_seq_t)rbf.blob_id, DB_FOP_READONLY, 0)) != 0) {
+ /*
+ * The file may have been deleted between creating the
+ * list and sending the data. Send a message saying
+ * the file has been deleted.
+ */
+ if (ret == ENOENT) {
+ F_SET(&rbc, BLOB_DELETE);
+ rbc.data.size = 0;
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE;
+ (void)__rep_send_message(env,
+ eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+ ret = 0;
+ fhp = NULL;
+ continue;
+ }
+ goto err;
+ }
+ offset = 0;
+ do {
+ done = 0;
+ rbc.flags = 0;
+ if ((ret = __blob_file_read(
+ env, fhp, &rbc.data, offset, MEGABYTE)) != 0)
+ goto err;
+ DB_ASSERT(env, rbc.data.size <= MEGABYTE);
+
+ /*
+ * In rare cases the blob file may have gotten shorter
+ * since the list was created.
+ */
+ if (rbc.data.size < (u_int32_t)MEGABYTE && (u_int64_t)
+ (offset + rbc.data.size) < rbf.blob_size) {
+ F_SET(&rbc, BLOB_CHUNK_FAIL);
+ done = 1;
+ }
+ /* File may have grown since the list was made. */
+ if ((u_int64_t)
+ (offset + rbc.data.size) > rbf.blob_size) {
+ rbc.data.size =
+ (u_int32_t)((off_t)rbf.blob_size - offset);
+ }
+ rbc.offset = (u_int64_t)offset;
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size;
+ (void)__rep_send_message(
+ env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+ offset += MEGABYTE;
+ } while ((u_int64_t)offset < rbf.blob_size && !done);
+
+ if (fhp != NULL && (ret = __os_closehandle(env, fhp)) != 0)
+ goto err;
+ fhp = NULL;
+ }
+err: if (chunk_buf != NULL)
+ __os_free(env, chunk_buf);
+ if (msg_buf != NULL)
+ __os_free(env, msg_buf);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (dbp != 0)
+ (void)__db_close(dbp, NULL, 0);
+ return (ret);
+}
+
static int
__rep_find_inmem(env, rfp, unused)
ENV *env;
@@ -1157,6 +2185,11 @@ __rep_find_inmem(env, rfp, unused)
COMPQUIET(env, NULL);
COMPQUIET(unused, NULL);
+ /*
+ * Cannot assume all databases are in-memory because abbreviated
+ * internal inits from 5.3 and earlier are not limited to in-memory
+ * databases.
+ */
return (FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? DB_KEYEXIST : 0);
}
@@ -1172,12 +2205,9 @@ __rep_remove_nimdbs(env)
FILE_LIST_CTX context;
int ret;
- if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0)
+ if ((ret = __rep_init_file_list_context(env,
+ DB_REPVERSION, 0, 0, &context)) != 0)
return (ret);
- context.size = MEGABYTE;
- context.count = 0;
- context.fillptr = context.buf;
- context.version = DB_REPVERSION;
/* NB: "NULL" asks walk_dir to consider only in-memory DBs */
if ((ret = __rep_walk_dir(env, NULL, NULL, &context)) != 0)
@@ -1240,14 +2270,11 @@ __rep_remove_all(env, msg_version, rec)
* 1. Get list of databases currently present at this client, which we
* intend to remove.
*/
- if ((ret = __os_calloc(env, 1, MEGABYTE, &context.buf)) != 0)
- return (ret);
- context.size = MEGABYTE;
- context.count = 0;
- context.version = DB_REPVERSION;
/* Reserve space for the marshaled update_args. */
- context.fillptr = FIRST_FILE_PTR(context.buf);
+ if ((ret = __rep_init_file_list_context(env,
+ DB_REPVERSION, 0, 1, &context)) != 0)
+ return (ret);
if ((ret = __rep_find_dbs(env, &context)) != 0)
goto out;
@@ -1333,6 +2360,9 @@ __rep_remove_all(env, msg_version, rec)
FIRST_FILE_PTR(context.buf), context.size,
context.count, __rep_remove_file, NULL)) != 0)
goto out;
+ /* Remove the blob directory. */
+ if ((ret = __blob_del_hierarchy(env)) != 0)
+ goto out;
/*
* 4. Safe-store the (new) list of database files we intend to copy from
@@ -1445,6 +2475,8 @@ __rep_remove_file(env, rfp, unused)
#ifdef HAVE_QUEUE
DB_THREAD_INFO *ip;
#endif
+ APPNAME appname;
+ db_seq_t blob_fid, blob_sid;
char *name;
int ret, t_ret;
@@ -1496,29 +2528,53 @@ __rep_remove_file(env, rfp, unused)
* That will only have removed extent files. Now
* we need to deal with the actual file itself.
*/
+ appname = __rep_is_internal_rep_file(rfp->info.data) ?
+ DB_APP_META : (IS_BLOB_META(rfp->info.data) ?
+ DB_APP_BLOB : DB_APP_DATA);
if (FLD_ISSET(rfp->db_flags, DB_AM_INMEM)) {
if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
return (ret);
MAKE_INMEM(dbp);
F_SET(dbp, DB_AM_RECOVER); /* Skirt locking. */
ret = __db_inmem_remove(dbp, NULL, name);
- } else if ((ret = __fop_remove(env,
- NULL, rfp->uid.data, name, (const char **)&rfp->dir.data,
- __rep_is_internal_rep_file(rfp->info.data) ?
- DB_APP_META : DB_APP_DATA, 0)) != 0)
+ } else if ((ret = __fop_remove(env, NULL, rfp->uid.data, name,
+ (const char **)&rfp->dir.data, appname, 0)) != 0) {
/*
* If fop_remove fails, it could be because
* the client has a different data_dir
* structure than the master. Retry with the
- * local, default settings.
+ * local, default settings.
*/
ret = __fop_remove(env,
- NULL, rfp->uid.data, name, NULL,
- __rep_is_internal_rep_file(rfp->info.data) ?
- DB_APP_META : DB_APP_DATA, 0);
-#ifdef HAVE_QUEUE
-out:
+ NULL, rfp->uid.data, name, NULL, appname, 0);
+#ifdef DB_WIN32
+ /*
+ * Deleting a blob meta database can result in a
+ * ERROR_PATH_NOT_FOUND error on windows, so treat
+ * that as an ENOENT.
+ */
+ if (__os_posix_err(ret) == ENOENT)
+ ret = ENOENT;
#endif
+ }
+ /* Clean any blob directories. */
+ if (ret == 0 && appname == DB_APP_BLOB) {
+ /* dbp has not been set, since queues do not support blobs. */
+ DB_ASSERT(env, dbp == NULL);
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto out;
+ if ((ret = __blob_path_to_dir_ids(
+ env, name, &blob_fid, &blob_sid)) != 0)
+ goto out;
+ /* blob_fid == 0 if it is the top level blob meta db. */
+ if (blob_fid != 0) {
+ dbp->blob_file_id = blob_fid;
+ dbp->blob_sdb_id = blob_sid;
+ if ((ret = __blob_del_all(dbp, NULL, 0)) != 0)
+ goto out;
+ }
+ }
+out:
if (dbp != NULL &&
(t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0)
ret = t_ret;
@@ -1610,10 +2666,11 @@ __rep_page(env, ip, eid, rp, rec)
{
DB_REP *db_rep;
- DBT key, data;
+ DBT data, key;
REP *rep;
__rep_fileinfo_args *msgfp, msgf;
__rep_fileinfo_v6_args *msgfpv6;
+ __rep_fileinfo_v7_args *msgfpv7;
db_recno_t recno;
int ret;
char *msg;
@@ -1647,21 +2704,30 @@ __rep_page(env, ip, eid, rp, rec)
(u_long)rep->first_lsn.offset));
return (DB_REP_PAGEDONE);
}
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (rp->rep_version < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env, rp->rep_version,
&msgfpv6, rec->data, rec->size, NULL)) != 0)
return (ret);
memcpy(&msgf, msgfpv6, sizeof(__rep_fileinfo_v6_args));
msgf.dir.data = NULL;
msgf.dir.size = 0;
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
msgfp = &msgf;
msgfree = msgfpv6;
+ } else if (rp->rep_version < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env, rp->rep_version,
+ &msgfpv7, rec->data, rec->size, NULL)) != 0)
+ return (ret);
+ memcpy(&msgf, msgfpv7, sizeof(__rep_fileinfo_v7_args));
+ msgf.blob_fid_lo = msgf.blob_fid_hi = 0;
+ msgfp = &msgf;
+ msgfree = msgfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
&msgfp, rec->data, rec->size, NULL)) != 0)
@@ -1671,9 +2737,9 @@ __rep_page(env, ip, eid, rp, rec)
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
/*
- * Check if the world changed.
+ * Check if the world changed or if we are in the blob sync phase.
*/
- if (rep->sync_state != SYNC_PAGE) {
+ if (rep->sync_state != SYNC_PAGE || rep->blob_sync != 0) {
ret = DB_REP_PAGEDONE;
goto err;
}
@@ -1785,6 +2851,218 @@ err: REP_SYSTEM_UNLOCK(env);
}
/*
+ * __rep_blob_chunk
+ * Process a blob chunk message. When a blob chunk arrives, delete its
+ * entry in the blob chunk gap database to show that it has arrived, and
+ * write the data to the blob file.
+ *
+ * PUBLIC: int __rep_blob_chunk __P((ENV *, int, DB_THREAD_INFO *, DBT *));
+ */
+int
+__rep_blob_chunk(env, eid, ip, rec)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ DBT *rec;
+{
+ DB_REP *db_rep;
+ DBC *dbc;
+ DB_FH *fhp;
+ DBT data, key;
+ REP *rep;
+ REGINFO *infop;
+ __rep_blob_chunk_args rbc;
+ __rep_fileinfo_args *rfp;
+ db_seq_t blob_fid;
+ char *blob_sub_dir, *last, *mkpath, *name, *path;
+ int ret;
+ off_t offset;
+ u_int8_t keybuf[BLOB_KEY_SIZE], *ptr;
+
+ ret = 0;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ infop = env->reginfo;
+ dbc = NULL;
+ blob_sub_dir = name = NULL;
+ path = NULL;
+ fhp = NULL;
+
+ if (rep->sync_state != SYNC_PAGE)
+ return (DB_REP_PAGEDONE);
+
+ if ((ret = __rep_blob_chunk_unmarshal(
+ env, &rbc, rec->data, rec->size, &ptr)) != 0)
+ return (ret);
+
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+ /*
+ * Check if the world changed.
+ */
+ if (rep->sync_state != SYNC_PAGE) {
+ ret = DB_REP_PAGEDONE;
+ goto err;
+ }
+ /*
+ * We should not ever be in internal init with a lease granted.
+ */
+ DB_ASSERT(env,
+ !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
+
+ /* Make sure this is for the current file. */
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret);
+ if (ret != 0)
+ goto err;
+
+ if (blob_fid != (db_seq_t)rbc.blob_fid) {
+ ret = DB_REP_PAGEDONE;
+ goto err;
+ }
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"REP_BLOB_CHUNK: blob_fid %llu, blob_sid %llu, blob_id %llu, offset %llu",
+ (unsigned long long)rbc.blob_fid,
+ (unsigned long long)rbc.blob_sid,
+ (unsigned long long)rbc.blob_id, (long long)rbc.offset));
+
+ if (db_rep->blob_dbp == NULL &&
+ (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) {
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "REP_BLOB_CHUNK: Client_dbinit %s",
+ db_strerror(ret)));
+ goto err;
+ }
+
+ /* Set the highest blob chunk received. */
+ if (rbc.blob_sid > (u_int64_t)rep->gap_bl_hi_sid ||
+ (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid &&
+ rbc.blob_id > (u_int64_t)rep->gap_bl_hi_id) ||
+ (rbc.blob_sid == (u_int64_t)rep->gap_bl_hi_sid &&
+ rbc.blob_id == (u_int64_t)rep->gap_bl_hi_id &&
+ rbc.offset > (u_int64_t)rep->gap_bl_hi_off)) {
+ rep->gap_bl_hi_id = (db_seq_t)rbc.blob_id;
+ rep->gap_bl_hi_sid = (db_seq_t)rbc.blob_sid;
+ rep->gap_bl_hi_off = (off_t)rbc.offset;
+ }
+
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ data.flags = key.flags = DB_DBT_USERMEM;
+ key.data = keybuf;
+ key.ulen = key.size = BLOB_KEY_SIZE;
+ data.data = (void *)&offset;
+ data.ulen = data.size = sizeof(offset);
+ /* BLOB_DELETE is set if the blob file was deleted. */
+ if (F_ISSET(&rbc, BLOB_DELETE)) {
+ memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE);
+ memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE);
+ if ((ret = __db_del(
+ db_rep->blob_dbp, ip, NULL, &key, 0)) != 0) {
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+ goto err;
+ }
+ goto done;
+ }
+
+ if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0)
+ goto err;
+ offset = (off_t)rbc.offset;
+ memcpy(keybuf, &rbc.blob_sid, BLOB_ID_SIZE);
+ memcpy(&(keybuf[BLOB_ID_SIZE]), &rbc.blob_id, BLOB_ID_SIZE);
+ /* If not found we have already dealt with this chunk. */
+ if ((ret = __dbc_get(dbc, &key, &data, DB_GET_BOTH)) != 0) {
+ if (ret == DB_NOTFOUND) {
+ ret = 0;
+ goto done;
+ }
+ goto err;
+ }
+ /*
+ * BLOB_CHUNK_FAIL is set if the blob file was truncated to shorter
+ * than the BLOB_CHUNK offset.
+ */
+ if (F_ISSET(&rbc, BLOB_CHUNK_FAIL)) {
+ while (ret == 0) {
+ if ((ret = __dbc_del(dbc, 0)) != 0)
+ goto err;
+ ret = __dbc_get(dbc, &key, &data, DB_NEXT_DUP);
+ }
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+ if ((ret = __dbc_close(dbc)) != 0)
+ goto err;
+ dbc = NULL;
+ goto done;
+ }
+ if ((ret = __dbc_del(dbc, 0)) != 0)
+ goto err;
+ if ((ret = __dbc_close(dbc)) != 0)
+ goto err;
+ dbc = NULL;
+
+ if ((ret = __blob_make_sub_dir(env, &blob_sub_dir,
+ (db_seq_t)rbc.blob_fid, (db_seq_t)rbc.blob_sid)) != 0)
+ goto err;
+
+ if ((ret = __blob_id_to_path(
+ env, blob_sub_dir, (db_seq_t)rbc.blob_id, &name)) != 0)
+ goto err;
+
+ if ((ret = __db_appname(env, DB_APP_BLOB, name, NULL, &path)) != 0 )
+ goto err;
+
+ last = __db_rpath(path);
+ DB_ASSERT(env, last != NULL);
+ *last = '\0';
+ if (__os_exists(env, path, NULL) != 0) {
+ *last = PATH_SEPARATOR[0];
+ mkpath = path;
+#ifdef DB_WIN32
+ /*
+ * Absolute paths on windows can result in it creating a "C"
+ * or "D" directory in the working directory.
+ */
+ if (__os_abspath(mkpath))
+ mkpath += 2;
+#endif
+ if ((ret = __db_mkpath(env, mkpath)) != 0)
+ goto err;
+ }
+ *last = PATH_SEPARATOR[0];
+ if ((ret = __os_open(
+ env, path, 0, DB_OSO_CREATE, env->db_mode, &fhp)) != 0)
+ goto err;
+
+ /* Write the data into the blob file. */
+ if ((ret = __fop_write_file(env, NULL, name, NULL, DB_APP_BLOB,
+ fhp, (off_t)rbc.offset, rbc.data.data, rbc.data.size, 0)) != 0)
+ goto err;
+ if ((ret = __os_closehandle(env, fhp)) != 0)
+ goto err;
+ fhp = NULL;
+
+done: ret = __rep_blobdone(env, eid, ip, rep, blob_fid, 0);
+
+err: REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ if (path != NULL)
+ __os_free(env, path);
+ if (blob_sub_dir != NULL)
+ __os_free(env, blob_sub_dir);
+ if (name != NULL)
+ __os_free(env, name);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (dbc != NULL)
+ (void)__dbc_close(dbc);
+
+ return (ret);
+}
+
+/*
* __rep_write_page -
* Write this page into a database.
*/
@@ -1801,13 +3079,16 @@ __rep_write_page(env, ip, rep, msgfp)
DB_PGINFO *pginfo;
DB_REP *db_rep;
REGINFO *infop;
+ APPNAME appname;
__rep_fileinfo_args *rfp;
+ char *blob_path;
int ret;
void *dst;
db_rep = env->rep_handle;
infop = env->reginfo;
rfp = NULL;
+ blob_path = NULL;
/*
* If this is the first page we're putting in this database, we need
@@ -1830,15 +3111,39 @@ __rep_write_page(env, ip, rep, msgfp)
RPRINT(env, (env, DB_VERB_REP_SYNC,
"rep_write_page: Calling fop_create for %s",
(char *)rfp->info.data));
+ appname = (__rep_is_internal_rep_file(rfp->info.data) ?
+ DB_APP_META : (IS_BLOB_META((char *)rfp->info.data)
+ ? DB_APP_BLOB : DB_APP_DATA));
+ /*
+ * May have to create the directory structure for blob
+ * metadata databases.
+ */
+ if (appname == DB_APP_BLOB) {
+ if ((ret = __db_appname(env,
+ appname, rfp->info.data,
+ (const char **)&rfp->dir.data,
+ &blob_path)) != 0)
+ goto err;
+#ifdef DB_WIN32
+ /*
+ * Absolute paths on windows can result in
+ * it creating a "C" or "D"
+ * directory in the working directory.
+ */
+ if (__os_abspath(blob_path))
+ blob_path += 2;
+#endif
+ if ((ret = __db_mkpath(env, blob_path)) != 0)
+ goto err;
+ }
if ((ret = __fop_create(env, NULL, NULL,
rfp->info.data, (const char **)&rfp->dir.data,
- __rep_is_internal_rep_file(rfp->info.data) ?
- DB_APP_META : DB_APP_DATA, env->db_mode, 0)) != 0) {
+ appname, env->db_mode, 0)) != 0) {
/*
* If fop_create fails, it could be because
* the client has a different data_dir
* structure than the master. Retry with the
- * local, default settings.
+ * local, default settings.
*/
RPRINT(env, (env, DB_VERB_REP_SYNC,
"rep_write_page: fop_create ret %d. Retry for %s, master datadir %s",
@@ -1929,7 +3234,10 @@ __rep_write_page(env, ip, rep, msgfp)
ret = __memp_fput(db_rep->file_mpf,
ip, dst, db_rep->file_dbp->priority);
-err: return (ret);
+err: if (blob_path != NULL)
+ __os_free(env, blob_path);
+
+ return (ret);
}
/*
@@ -1976,7 +3284,7 @@ __rep_page_gap(env, rep, msgfp, type)
* Make sure we're still talking about the same file.
* If not, we're done here.
*/
- if (rfp->filenum != msgfp->filenum) {
+ if (rfp->filenum != msgfp->filenum || rep->blob_sync != 0) {
ret = DB_REP_PAGEDONE;
goto err;
}
@@ -2135,6 +3443,53 @@ err:
}
/*
+ * __rep_blob_cleanup -
+ * Clean up blob internal init information.
+ *
+ * Caller must hold client database mutex (mtx_clientdb) and
+ * REP_SYSTEM_LOCK.
+ */
+static int
+__rep_blob_cleanup(env, rep)
+ ENV *env;
+ REP *rep;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ int ret, t_ret;
+ u_int32_t count;
+
+ ret = 0;
+ db_rep = env->rep_handle;
+
+ /*
+ * Delete any remaining records in the blob chunk database. The blob
+ * chunk database contains descriptions of the blob chunks that have
+ * yet to arrive. If not deleted, the remaining records could
+ * interfere with how the next REP_BLOB_UPDATE message is handled.
+ */
+ if (db_rep->blob_dbp != NULL) {
+ ENV_GET_THREAD_INFO(env, ip);
+ ret = __db_truncate(db_rep->blob_dbp, ip, NULL, &count);
+ t_ret = __db_close(db_rep->blob_dbp, NULL, DB_NOSYNC);
+ if (ret == 0)
+ ret = t_ret;
+ db_rep->blob_dbp = NULL;
+ }
+ /* Reset blob internal init control values. */
+ rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0;
+ rep->last_blob_id = rep->last_blob_sid = 0;
+ rep->prev_blob_id = rep->prev_blob_sid = 0;
+ rep->gap_bl_hi_off = 0;
+ rep->blob_more_files = 0;
+ rep->blob_sync = 0;
+ rep->highest_id = 0;
+ rep->blob_rereq = 0;
+
+ return (ret);
+}
+
+/*
* __rep_init_cleanup -
* Clean up internal initialization pieces.
*
@@ -2162,9 +3517,10 @@ __rep_init_cleanup(env, rep, force)
/*
* 1. Close up the file data pointer we used.
* 2. Close/reset the page database.
- * 3. Close/reset the queue database if we're forcing a cleanup.
- * 4. Free current file info.
- * 5. If we have all files or need to force, free original file info.
+ * 3. Close/truncate the blob chunk gap database.
+ * 4. Close/reset the queue database if we're forcing a cleanup.
+ * 5. Free current file info.
+ * 6. If we have all files or need to force, free original file info.
*/
if (db_rep->file_mpf != NULL) {
ret = __memp_fclose(db_rep->file_mpf, 0);
@@ -2176,6 +3532,15 @@ __rep_init_cleanup(env, rep, force)
if (ret == 0)
ret = t_ret;
}
+ /*
+ * Truncate the blob chunk gap database, since entries in the database
+ * are for blob chunks we are expecting to arrive. Also reset blob
+ * internal init control values.
+ */
+ t_ret = __rep_blob_cleanup(env, rep);
+ if (ret == 0)
+ ret = t_ret;
+
if (force && db_rep->queue_dbc != NULL) {
queue_dbp = db_rep->queue_dbc->dbp;
if ((t_ret = __dbc_close(db_rep->queue_dbc)) != 0 && ret == 0)
@@ -2324,8 +3689,8 @@ __rep_clean_interrupted(env)
* __rep_filedone -
* We need to check if we're done with the current file after
* processing the current page. Stat the database to see if
- * we have all the pages. If so, we need to clean up/close
- * this one, set up for the next one, and ask for its pages,
+ * we have all the pages and blobs. If so, we need to clean up/close
+ * this one, set up for the next one, and ask for its pages and blobs,
* or if this is the last file, request the log records and
* move to the REP_RECOVER_LOG state.
*/
@@ -2338,9 +3703,14 @@ __rep_filedone(env, ip, eid, rep, msgfp, type)
__rep_fileinfo_args *msgfp;
u_int32_t type;
{
+ DBT msg;
REGINFO *infop;
__rep_fileinfo_args *rfp;
+ __rep_blob_update_req_args rbur;
int ret;
+ u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE];
+
+ memset(&msg, 0, sizeof(DBT));
/*
* We've put our page, now we need to do any gap processing
@@ -2375,8 +3745,96 @@ __rep_filedone(env, ip, eid, rep, msgfp, type)
((ret = __rep_queue_filedone(env, ip, rep, rfp)) !=
DB_REP_PAGEDONE))
return (ret);
+
+ /* Request blob files. */
+ if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) {
+ ret = 0;
+ rep->blob_sync = 1;
+ memset(&rbur, 0, sizeof(__rep_blob_update_req_args));
+ GET_LO_HI(env,
+ rfp->blob_fid_lo, rfp->blob_fid_hi, rbur.blob_fid, ret);
+ msg.size = __REP_BLOB_UPDATE_REQ_SIZE;
+ msg.data = buf;
+ __rep_blob_update_req_marshal(env, &rbur, msg.data);
+ (void)__rep_send_message(env,
+ rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0);
+ return (ret);
+ }
+
+ /*
+ * We have all the data for this file. Clean up.
+ */
+ if ((ret = __rep_init_cleanup(env, rep, 0)) != 0)
+ return (ret);
+
+ rep->curfile++;
+ ret = __rep_nextfile(env, eid, rep);
+
+ return (ret);
+}
+
+/*
+ * __rep_blobdone -
+ * We need to check if we're done with the current file after
+ * processing the current blob chunk.
+ *
+ * Caller must hold client database mutex (mtx_clientdb) and
+ * REP_SYSTEM_LOCK.
+ */
+static int
+__rep_blobdone(env, eid, ip, rep, blob_fid, force)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ REP *rep;
+ db_seq_t blob_fid;
+ int force;
+{
+ DBT msg;
+ __rep_blob_update_req_args rbur;
+ int done, ret;
+ u_int8_t buf[__REP_BLOB_UPDATE_REQ_SIZE];
+
/*
- * We have all the pages for this file. Clean up.
+ * We've written our blob chunk, now we need to do any gap processing
+ * that might be needed to re-request chunks.
+ */
+ done = 0;
+ ret = __rep_blob_chunk_gap(env, eid, ip, rep, &done, blob_fid, force);
+ /*
+ * The world changed while we were doing gap processing.
+ * We're done here.
+ */
+ if (ret == DB_REP_PAGEDONE)
+ return (0);
+ else if (ret != 0)
+ goto err;
+
+ /*
+ * If the blob database is empty then all files in the current list
+ * have been processed. However, there may be more files on the
+ * master, so request the next list if that is the case.
+ */
+ if (done && rep->blob_more_files) {
+ memset(&rbur, 0, sizeof(__rep_blob_update_req_args));
+ rbur.blob_fid = (u_int64_t)blob_fid;
+ rbur.blob_sid = (u_int64_t)rep->last_blob_sid;
+ rbur.blob_id = (u_int64_t)rep->last_blob_id;
+ rbur.highest_id = (u_int64_t)rep->highest_id;
+ rep->gap_bl_hi_id = rep->gap_bl_hi_sid = 0;
+ rep->gap_bl_hi_off = 0;
+ rep->blob_rereq = 0;
+ msg.size = __REP_BLOB_UPDATE_REQ_SIZE;
+ msg.data = buf;
+ __rep_blob_update_req_marshal(env, &rbur, msg.data);
+ (void)__rep_send_message(env,
+ rep->master_id, REP_BLOB_UPDATE_REQ, NULL, &msg, 0, 0);
+ return (0);
+ } else if (!done)
+ return (0);
+
+ /*
+ * We have all the data for this file. Clean up.
*/
if ((ret = __rep_init_cleanup(env, rep, 0)) != 0)
goto err;
@@ -2388,6 +3846,255 @@ err:
}
/*
+ * __rep_blob_chunk_gap -
+ * We have written a blob chunk. Now check if there are any that need
+ * to be re-requested. The blob chunk gap database contains
+ * descriptions of all the blob chunks that have yet to arrive.
+ *
+ * Caller must hold client database mutex (mtx_clientdb) and
+ * REP_SYSTEM_LOCK.
+ */
+static int
+__rep_blob_chunk_gap(env, eid, ip, rep, done, blob_fid, force)
+ ENV *env;
+ int eid;
+ DB_THREAD_INFO *ip;
+ REP *rep;
+ int *done;
+ db_seq_t blob_fid;
+ int force;
+{
+ DBC *dbc;
+ DBT data, high, key, msg;
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REGINFO *infop;
+ __rep_blob_chunk_req_args rbcr;
+ __rep_fileinfo_args *rfp;
+ db_seq_t cur_blob_fid;
+ off_t offset;
+ int ret;
+ u_int8_t buf[BLOB_KEY_SIZE], msgbuf[__REP_BLOB_CHUNK_REQ_SIZE];
+
+ db_rep = env->rep_handle;
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ infop = env->reginfo;
+ ret = 0;
+ dbc = NULL;
+ *done = 0;
+
+ /* eid will be used when peer-to-peer is re-enabled for blobs. */
+ COMPQUIET(eid, 0);
+
+ /*
+ * Make sure we're still talking about the same file.
+ * If not, we're done here.
+ */
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, cur_blob_fid, ret);
+ if (cur_blob_fid != blob_fid) {
+ ret = DB_REP_PAGEDONE;
+ goto err;
+ }
+
+ /* Get the first missing blob chunk. */
+ if ((ret = __db_cursor(db_rep->blob_dbp, ip, NULL, &dbc, 0)) != 0)
+ goto err;
+ memset(&key, 0, sizeof(DBT));
+ memset(&data, 0, sizeof(DBT));
+ ret = __dbc_get(dbc, &key, &data, DB_FIRST);
+ if (ret == DB_NOTFOUND) {
+ /* All blobs received. */
+ ret = 0;
+ *done = 1;
+ goto err;
+ } else if (ret != 0)
+ goto err;
+
+ DB_ASSERT(env, key.size == BLOB_KEY_SIZE);
+ DB_ASSERT(env, data.size == sizeof(off_t));
+ offset = *(off_t *)data.data;
+ /*
+ * Format the sdbid and id of the high chunk as a blob gap
+ * database key, so it can be compared with the entries in that
+ * database.
+ */
+ memset(&high, 0, sizeof(DBT));
+ memcpy(buf, &rep->gap_bl_hi_sid, BLOB_ID_SIZE);
+ memcpy(buf + BLOB_ID_SIZE, &rep->gap_bl_hi_id, BLOB_ID_SIZE);
+ high.data = buf;
+ high.size = BLOB_KEY_SIZE;
+
+ /*
+ * If the first chunk in the database is larger than the highest chunk
+ * received, then there is no gap.
+ *
+ * If a gap does exist, check if it is time to do a re-request. If so,
+ * re-request every chunk that exists before the highest received.
+ */
+ if (!force && (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 ||
+ (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 &&
+ offset > rep->gap_bl_hi_off))) {
+ lp->wait_ts = db_rep->request_gap;
+ __os_gettime(env, &lp->rcvd_ts, 1);
+ } else if (force || __rep_check_doreq(env, rep)) {
+ /*
+ * Re-request every chunk less than the highest one, plus the
+ * next blob chunk that we are expecting. The next expected
+ * blob chunk is requested in case the last blob chunk is lost
+ * in transit.
+ */
+ do {
+ memset(&rbcr, 0, sizeof(__rep_blob_chunk_req_args));
+ memcpy(&(rbcr.blob_sid), key.data, BLOB_ID_SIZE);
+ memcpy(&(rbcr.blob_id),
+ (u_int8_t *)key.data + BLOB_ID_SIZE, BLOB_ID_SIZE);
+ rbcr.offset = *(u_int64_t *)data.data;
+ rbcr.blob_fid = (u_int64_t)blob_fid;
+ msg.size = __REP_BLOB_CHUNK_REQ_SIZE;
+ msg.data = msgbuf;
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+"blob_chunk_gap: Req file_id %llu, sdb_id %llu, blob_id %llu, offset %llu",
+ (long long)rbcr.blob_fid, (long long)rbcr.blob_sid,
+ (long long)rbcr.blob_id, (long long)rbcr.offset));
+ __rep_blob_chunk_req_marshal(env, &rbcr, msg.data);
+ /*
+ * Note that peer-to-peer initialization is not
+ * supported for blobs.
+ */
+ (void)__rep_send_message(
+ env, rep->master_id,
+ REP_BLOB_CHUNK_REQ, NULL, &msg, 0, 0);
+ /*
+ * Break after requesting the chunk after the highest
+ * one.
+ */
+ if (__rep_blob_cmp(NULL, &key, &high, NULL) > 0 ||
+ (__rep_blob_cmp(NULL, &key, &high, NULL) == 0 &&
+ offset > rep->gap_bl_hi_off))
+ break;
+ if ((ret = __dbc_get(
+ dbc, &key, &data, DB_NEXT)) != 0) {
+ if (ret == DB_NOTFOUND) {
+ ret = 0;
+ break;
+ }
+ goto err;
+ }
+ } while (1);
+ }
+
+err: if (dbc != NULL)
+ (void)__dbc_close(dbc);
+
+ return (ret);
+}
+
+/*
+ * __rep_blob_chunk_req
+ * Answer a request for a specific blob chunk.
+ *
+ * PUBLIC: int __rep_blob_chunk_req __P((ENV *, int, DBT *));
+ */
+int
+__rep_blob_chunk_req(env, eid, rec)
+ ENV *env;
+ int eid;
+ DBT *rec;
+{
+ DB *dbp;
+ DBT msg;
+ DB_FH *fhp;
+ __rep_blob_chunk_args rbc;
+ __rep_blob_chunk_req_args rbcr;
+ int ret;
+ u_int8_t *chunk_buf, *msg_buf, *ptr;
+
+ dbp = NULL;
+ fhp = NULL;
+ chunk_buf = msg_buf = NULL;
+
+ if ((ret =
+ __os_malloc(env, MEGABYTE + __REP_BLOB_CHUNK_SIZE, &msg_buf)) != 0)
+ goto err;
+ memset(&msg, 0, sizeof(DBT));
+ msg.data = msg_buf;
+ msg.ulen = MEGABYTE + __REP_BLOB_CHUNK_SIZE;
+ if ((ret = __os_malloc(env, MEGABYTE, &chunk_buf)) != 0)
+ goto err;
+ memset(&rbc, 0, sizeof(__rep_blob_chunk_args));
+ rbc.data.data = chunk_buf;
+ rbc.data.ulen = MEGABYTE;
+ rbc.data.flags = DB_DBT_USERMEM;
+
+ if ((ret = __rep_blob_chunk_req_unmarshal(
+ env, &rbcr, rec->data, rec->size, &ptr)) != 0)
+ goto err;
+
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "blob_chunk_req: file_id %llu, sdbid %llu, id %llu, offset %llu",
+ (long long)rbcr.blob_fid, (long long)rbcr.blob_sid,
+ (long long)rbcr.blob_id, (long long)rbcr.offset));
+
+ rbc.blob_fid = rbcr.blob_fid;
+ rbc.blob_id = rbcr.blob_id;
+ rbc.blob_sid = rbcr.blob_sid;
+ rbc.offset = rbcr.offset;
+ if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
+ goto err;
+ dbp->blob_file_id = (db_seq_t)rbcr.blob_fid;
+ dbp->blob_sdb_id = (db_seq_t)rbcr.blob_sid;
+ if ((ret = __blob_make_sub_dir(env, &dbp->blob_sub_dir,
+ (db_seq_t)rbcr.blob_fid, (db_seq_t)rbcr.blob_sid)) != 0)
+ goto err;
+ if ((ret = __blob_file_open(
+ dbp, &fhp, (db_seq_t)rbcr.blob_id, DB_FOP_READONLY, 0)) != 0) {
+ /*
+ * The file may have been deleted between creating the
+ * list and sending the request. Send a message saying
+ * the file has been deleted.
+ */
+ if (ret == ENOENT) {
+ ret = 0;
+ F_SET(&rbc, BLOB_DELETE);
+ rbc.data.size = 0;
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE;
+ (void)__rep_send_message(
+ env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+ goto err;
+ }
+ goto err;
+ }
+ if ((ret = __blob_file_read(
+ env, fhp, &rbc.data, (off_t)rbcr.offset, MEGABYTE)) != 0)
+ goto err;
+ DB_ASSERT(env, rbc.data.size <= MEGABYTE);
+
+ /*
+ * In rare cases the blob file may have gotten shorter
+ * since the list was created.
+ */
+ if (rbc.data.size == 0)
+ F_SET(&rbc, BLOB_CHUNK_FAIL);
+ __rep_blob_chunk_marshal(env, &rbc, msg.data);
+ msg.size = __REP_BLOB_CHUNK_SIZE + rbc.data.size;
+ (void)__rep_send_message(env, eid, REP_BLOB_CHUNK, NULL, &msg, 0, 0);
+
+err: if (chunk_buf != NULL)
+ __os_free(env, chunk_buf);
+ if (msg_buf != NULL)
+ __os_free(env, msg_buf);
+ if (fhp != NULL)
+ (void)__os_closehandle(env, fhp);
+ if (dbp != 0)
+ (void)__db_close(dbp, NULL, 0);
+ return (ret);
+}
+
+/*
* Starts requesting pages for the next file in the list (if any), or if not,
* proceeds to the next stage: requesting logs.
*
@@ -2404,19 +4111,25 @@ __rep_nextfile(env, eid, rep)
DBT dbt;
__rep_logreq_args lr_args;
DB_LOG *dblp;
+ DB_REP *db_rep;
+ DELAYED_BLOB_LIST *dbl;
LOG *lp;
REGENV *renv;
REGINFO *infop;
__rep_fileinfo_args *curinfo, *rfp, rf;
__rep_fileinfo_v6_args *rfpv6;
- int *curbuf, ret;
+ __rep_fileinfo_v7_args *rfpv7;
+ int *curbuf, ret, view_partial;
u_int8_t *buf, *info_ptr, lrbuf[__REP_LOGREQ_SIZE], *nextinfo;
size_t len, msgsz;
+ char *name;
void *rffree;
infop = env->reginfo;
renv = infop->primary;
+ db_rep = env->rep_handle;
rfp = NULL;
+ dbl = NULL;
/*
* Always direct the next request to the master (at least nominally),
@@ -2430,13 +4143,13 @@ __rep_nextfile(env, eid, rep)
/* Set curinfo to next file and examine it. */
info_ptr = R_ADDR(infop,
rep->originfo_off + (rep->originfolen - rep->infolen));
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (rep->infoversion < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env,
rep->infoversion, &rfpv6,
info_ptr, rep->infolen, &nextinfo)) != 0)
@@ -2444,8 +4157,18 @@ __rep_nextfile(env, eid, rep)
memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args));
rf.dir.data = NULL;
rf.dir.size = 0;
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
rfp = &rf;
rffree = rfpv6;
+ } else if (rep->infoversion < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env,
+ rep->infoversion, &rfpv7,
+ info_ptr, rep->infolen, &nextinfo)) != 0)
+ return (ret);
+ memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args));
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
+ rfp = &rf;
+ rffree = rfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env,
rep->infoversion, &rfp, info_ptr,
@@ -2457,6 +4180,14 @@ __rep_nextfile(env, eid, rep)
}
rffree = rfp;
}
+#ifndef HAVE_64BIT_TYPES
+ if (rfp->blob_fid_lo != 0 || rfp->blob_fid_hi != 0) {
+ __db_errx(env, DB_STR("3705",
+ "Blobs require 64 integer compiler support."));
+ __os_free(env, rffree);
+ return (DB_OPNOTSUP);
+ }
+#endif
rep->infolen -= (u_int32_t)(nextinfo - info_ptr);
MUTEX_LOCK(env, renv->mtx_regenv);
ret = __env_alloc(infop, sizeof(__rep_fileinfo_args) +
@@ -2484,19 +4215,55 @@ __rep_nextfile(env, eid, rep)
rfp->dir.data, rfp->dir.size);
__os_free(env, rffree);
- /* Skip over regular DB's in "abbreviated" internal inits. */
- if (F_ISSET(rep, REP_F_ABBREVIATED) &&
+ /*
+ * If a partial callback is set, invoke the callback to see if
+ * this file should be replicated.
+ */
+ if (IS_VIEW_SITE(env) && curinfo->info.size > 0 &&
!FLD_ISSET(curinfo->db_flags, DB_AM_INMEM)) {
+ name = (char *)curinfo->info.data;
+ DB_ASSERT(env, db_rep->partial != NULL);
+ /*
+ * Always replicate system owned databases.
+ */
+ if (IS_DB_FILE(name) && !IS_BLOB_META(name))
+ view_partial = 1;
+ else if ((ret = __rep_call_partial(env,
+ name, &view_partial, 0, &dbl)) != 0) {
+ VPRINT(env, (env, DB_VERB_REP_SYNC,
+ "rep_nextfile: partial cb err %d for %s",
+ ret, name));
+ return (ret);
+ }
+ /*
+ * dbl != NULL when we could not find the name of the
+ * database that owns a blob meta database. If that
+ * happens then it was never opened, which means it
+ * was not replicated, and as such neither should its
+ * bmd be replicated.
+ */
+ if (dbl != NULL) {
+ view_partial = 0;
+ __os_free(env, dbl);
+ dbl = NULL;
+ }
VPRINT(env, (env, DB_VERB_REP_SYNC,
- "Skipping file %d in abbreviated internal init",
- curinfo->filenum));
- MUTEX_LOCK(env, renv->mtx_regenv);
- __env_alloc_free(infop,
- R_ADDR(infop, rep->curinfo_off));
- MUTEX_UNLOCK(env, renv->mtx_regenv);
- rep->curinfo_off = INVALID_ROFF;
- rep->curfile++;
- continue;
+ "rep_nextfile: %s file %s %d on view site.",
+ view_partial == 0 ?
+ "Skipping" : "Replicating",
+ name, curinfo->filenum));
+ /*
+ * If we're skipping the file, move to the next one.
+ */
+ if (view_partial == 0) {
+ MUTEX_LOCK(env, renv->mtx_regenv);
+ __env_alloc_free(infop,
+ R_ADDR(infop, rep->curinfo_off));
+ MUTEX_UNLOCK(env, renv->mtx_regenv);
+ rep->curinfo_off = INVALID_ROFF;
+ rep->curfile++;
+ continue;
+ }
}
/* Request this file's pages. */
@@ -2519,15 +4286,19 @@ __rep_nextfile(env, eid, rep)
curinfo->uid.size + curinfo->info.size;
if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0)
return (ret);
+ /*
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
+ */
if (rep->infoversion < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env, rep->infoversion,
(__rep_fileinfo_v6_args *)curinfo, buf,
msgsz, &len);
+ else if (rep->infoversion < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env, rep->infoversion,
+ (__rep_fileinfo_v7_args *)curinfo, buf,
+ msgsz, &len);
else
ret = __rep_fileinfo_marshal(env, rep->infoversion,
curinfo, buf, msgsz, &len);
@@ -2834,16 +4605,19 @@ __rep_pggap_req(env, rep, reqfp, gapflags)
* new info into rep->finfo. Assert that the sizes never
* change. The only thing this should do is change
* the pgno field. Everything else remains the same.
+ *
+ * It is safe to cast to the old structs
+ * because the first part of the current
+ * struct matches the old structs.
*/
if (rep->infoversion < DB_REPVERSION_53)
- /*
- * It is safe to cast to the old struct
- * because the first part of the current
- * struct matches the old struct.
- */
ret = __rep_fileinfo_v6_marshal(env, rep->infoversion,
(__rep_fileinfo_v6_args *)tmpfp, buf,
msgsz, &len);
+ else if (rep->infoversion < DB_REPVERSION_61)
+ ret = __rep_fileinfo_v7_marshal(env, rep->infoversion,
+ (__rep_fileinfo_v7_args *)tmpfp, buf,
+ msgsz, &len);
else
ret = __rep_fileinfo_marshal(env, rep->infoversion,
tmpfp, buf, msgsz, &len);
@@ -2865,6 +4639,94 @@ err:
}
/*
+ * __rep_blob_rereq -
+ *
+ * Re-request lost blob messages, such as REP_BLOB_CHUNK_REQ, REP_BLOB_ALL_REQ,
+ * or REP_BLOB_UPDATE_REQ. Note that the blob chunk gap database contains
+ * descriptions of the blob chunks that we are expecting to arrive.
+ *
+ * Assumes the caller holds mtx_clientdb and rep_mutex.
+ *
+ * PUBLIC: int __rep_blob_rereq __P((ENV *, REP *));
+ */
+int
+__rep_blob_rereq(env, rep)
+ ENV *env;
+ REP *rep;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ REGINFO *infop;
+ __rep_fileinfo_args *rfp;
+ db_seq_t blob_fid;
+ int master, ret;
+ u_int32_t count;
+
+ db_rep = env->rep_handle;
+ infop = env->reginfo;
+ rfp = NULL;
+ ret = 0;
+
+ /* First check if the master is around to answer the re-request. */
+ master = rep->master_id;
+ if (master == DB_EID_INVALID) {
+ (void)__rep_send_message(env,
+ DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
+ goto err;
+ }
+
+ if (db_rep->blob_dbp == NULL &&
+ (ret = __rep_client_dbinit(env, 0, REP_BLOB)) != 0) {
+ RPRINT(env, (env, DB_VERB_REP_SYNC,
+ "REP_BLOB_CHUNK: Client_dbinit %s",
+ db_strerror(ret)));
+ goto err;
+ }
+
+ /*
+ * If the gap blob id is 0 then we either lost a REP_BLOB_ALL_REQ or
+ * a REP_BLOB_UPDATE_REQ message. Since we do not have the information
+ * to reconstruct a REP_BLOB_ALL_REQ message, reset the blob gap
+ * database and start over at the REP_BLOB_UPDATE_REQ stage.
+ *
+ * If the blob gap id is not 0, we lost a REP_BLOB_CHUNK_REQ message,
+ * so perform blob gap processing.
+ */
+ ENV_GET_THREAD_INFO(env, ip);
+ if (rep->gap_bl_hi_id == 0) {
+ /*
+ * It takes a while to create the blob update message, so skip
+ * the first time it asks.
+ */
+ if (rep->blob_rereq == 0) {
+ rep->blob_rereq = 1;
+ goto err;
+ }
+ rep->blob_rereq = 0;
+ if ((ret = __db_truncate(
+ db_rep->blob_dbp, ip, NULL, &count)) != 0)
+ goto err;
+ rep->blob_more_files = 1;
+ rep->last_blob_id = rep->prev_blob_id;
+ rep->last_blob_sid = rep->prev_blob_sid;
+ }
+
+ GET_CURINFO(rep, infop, rfp);
+ GET_LO_HI(env, rfp->blob_fid_lo, rfp->blob_fid_hi, blob_fid, ret);
+ if (ret != 0)
+ goto err;
+ /*
+ * If there are entries in the blob gap database, __rep_blobdone
+ * will perform gap processing, otherwise it will send
+ * a REP_BLOB_UPDATE_REQ.
+ */
+ ret = __rep_blobdone(env, master, ip, rep, blob_fid, 1);
+
+err:
+ return (ret);
+}
+
+/*
* __rep_finfo_alloc -
* Allocate and initialize a fileinfo structure.
*
@@ -3521,6 +5383,7 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg)
{
__rep_fileinfo_args *rfp, rf;
__rep_fileinfo_v6_args *rfpv6;
+ __rep_fileinfo_v7_args *rfpv7;
u_int8_t *next;
int ret;
void *rffree;
@@ -3530,21 +5393,30 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg)
rfpv6 = NULL;
rffree = NULL;
while (count-- > 0) {
+ /*
+ * Build a current struct by copying in the older
+ * version struct and then setting up the new fields.
+ * This is safe because all old fields are in the
+ * same location in the current struct.
+ */
if (version < DB_REPVERSION_53) {
- /*
- * Build a current struct by copying in the older
- * version struct and then setting up the data_dir.
- * This is safe because all old fields are in the
- * same location in the current struct.
- */
if ((ret = __rep_fileinfo_v6_unmarshal(env, version,
&rfpv6, files, size, &next)) != 0)
break;
memcpy(&rf, rfpv6, sizeof(__rep_fileinfo_v6_args));
rf.dir.data = NULL;
rf.dir.size = 0;
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
rfp = &rf;
rffree = rfpv6;
+ } else if (version < DB_REPVERSION_61) {
+ if ((ret = __rep_fileinfo_v7_unmarshal(env, version,
+ &rfpv7, files, size, &next)) != 0)
+ break;
+ memcpy(&rf, rfpv7, sizeof(__rep_fileinfo_v7_args));
+ rf.blob_fid_lo = rf.blob_fid_hi = 0;
+ rfp = &rf;
+ rffree = rfpv7;
} else {
if ((ret = __rep_fileinfo_unmarshal(env, version,
&rfp, files, size, &next)) != 0)
@@ -3566,3 +5438,33 @@ __rep_walk_filelist(env, version, files, size, count, fn, arg)
__os_free(env, rffree);
return (ret);
}
+
+/*
+ * Initializes a FILE_LIST_CTX structure.
+ *
+ * Pass in a non-zero value for update_space to reserve space for
+ * update_args in the context's buffer.
+ */
+static int
+__rep_init_file_list_context(env, version, flags, update_space, context)
+ ENV *env;
+ u_int32_t version;
+ u_int32_t flags;
+ int update_space;
+ FILE_LIST_CTX *context;
+{
+ int ret;
+
+ if ((ret = __os_calloc(env, 1, MEGABYTE, &context->buf)) != 0)
+ return (ret);
+ context->size = MEGABYTE;
+ context->count = 0;
+ context->version = version;
+ context->flags = flags;
+ /* Reserve space for update_args. */
+ if (update_space)
+ context->fillptr = FIRST_FILE_PTR(context->buf);
+ else
+ context->fillptr = context->buf;
+ return (ret);
+}