summaryrefslogtreecommitdiff
path: root/src/repmgr/repmgr_method.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/repmgr/repmgr_method.c')
-rw-r--r--src/repmgr/repmgr_method.c954
1 files changed, 706 insertions, 248 deletions
diff --git a/src/repmgr/repmgr_method.c b/src/repmgr/repmgr_method.c
index 229cf650..729ba5ff 100644
--- a/src/repmgr/repmgr_method.c
+++ b/src/repmgr/repmgr_method.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -29,19 +29,17 @@ static int get_channel_connection __P((CHANNEL *, REPMGR_CONNECTION **));
static int init_dbsite __P((ENV *, int, const char *, u_int, DB_SITE **));
static int join_group_at_site __P((ENV *, repmgr_netaddr_t *));
static int kick_blockers __P((ENV *, REPMGR_CONNECTION *, void *));
-static int make_request_conn __P((ENV *,
- repmgr_netaddr_t *, REPMGR_CONNECTION **));
static int set_local_site __P((DB_SITE *, u_int32_t));
-static int read_own_msg __P((ENV *,
- REPMGR_CONNECTION *, u_int32_t *, u_int8_t **, size_t *));
static int refresh_site __P((DB_SITE *));
static int __repmgr_await_threads __P((ENV *));
static int __repmgr_build_data_out __P((ENV *,
DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp));
static int __repmgr_build_msg_out __P((ENV *,
DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp));
+static int __repmgr_demote_site(ENV *, int);
static int repmgr_only __P((ENV *, const char *));
static int __repmgr_restart __P((ENV *, int, u_int32_t));
+static int __repmgr_remove_and_close_site __P((DB_SITE *));
static int __repmgr_remove_site __P((DB_SITE *));
static int __repmgr_remove_site_pp __P((DB_SITE *));
static int __repmgr_start_msg_threads __P((ENV *, u_int));
@@ -52,25 +50,21 @@ static int send_msg_self __P((ENV *, REPMGR_IOVECS *, u_int32_t));
static int site_by_addr __P((ENV *, const char *, u_int, DB_SITE **));
/*
- * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
+ * PUBLIC: int __repmgr_start_pp __P((DB_ENV *, int, u_int32_t));
*/
int
-__repmgr_start(dbenv, nthreads, flags)
+__repmgr_start_pp(dbenv, nthreads, flags)
DB_ENV *dbenv;
int nthreads;
u_int32_t flags;
{
DB_REP *db_rep;
- REP *rep;
- REPMGR_SITE *me, *site;
- DB_THREAD_INFO *ip;
ENV *env;
- int first, is_listener, locked, min, need_masterseek, ret, start_master;
- u_int i, n;
+ DB_THREAD_INFO *ip;
+ int ret;
env = dbenv->env;
db_rep = env->rep_handle;
- rep = db_rep->region;
switch (flags) {
case 0:
@@ -102,7 +96,27 @@ __repmgr_start(dbenv, nthreads, flags)
return (EINVAL);
}
- /* Check if it is a shut-down site, if so, clean the resources. */
+ /* A view site cannot be started as MASTER or ELECTION. */
+ if (IS_VIEW_SITE(env) &&
+ (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) {
+ __db_errx(env, DB_STR("3694",
+ "A view site must be started with DB_REP_CLIENT"));
+ return (EINVAL);
+ }
+
+ /* Must start site as client in preferred master mode. */
+ if (PREFMAS_IS_SET(env) &&
+ (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) {
+ __db_errx(env, DB_STR("3702",
+ "A preferred master site must be started with "
+ "DB_REP_CLIENT"));
+ return (EINVAL);
+ }
+
+ /*
+ * Check if it is a shut-down site, if so, clean the resources and
+ * reset the status in order to get ready to start replication.
+ */
if (db_rep->repmgr_status == stopped) {
if ((ret = __repmgr_stop(env)) != 0) {
__db_errx(env, DB_STR("3638",
@@ -112,7 +126,55 @@ __repmgr_start(dbenv, nthreads, flags)
db_rep->repmgr_status = ready;
}
+ /* Record the original configurations given by application. */
+ ENV_ENTER(env, ip);
db_rep->init_policy = flags;
+ db_rep->config_nthreads = nthreads;
+ ret = __repmgr_start_int(env, nthreads, flags);
+ ENV_LEAVE(env, ip);
+ return (ret);
+}
+
+/*
+ * Internal processing to start replication manager.
+ *
+ * PUBLIC: int __repmgr_start_int __P((ENV *, int, u_int32_t));
+ */
+int
+__repmgr_start_int(env, nthreads, flags)
+ ENV *env;
+ int nthreads;
+ u_int32_t flags;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REP *rep;
+ REPMGR_SITE *me, *site;
+ u_int32_t startopts;
+ int first, flags_error, is_listener, locked, min;
+ int need_masterseek, ret, start_master;
+ u_int i, n;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ flags_error = 0;
+ startopts = 0;
+
+ /*
+ * For preferred master master site startup, we need to save the
+ * log location at the end of our previous transactions for
+ * the lsnhist_match comparisons. Starting repmgr adds a few
+ * more log records that we don't want to count in lsnhist_match.
+ */
+ if (FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) {
+ LOG_SYSTEM_LOCK(env);
+ db_rep->prefmas_init_lsn = lp->lsn;
+ LOG_SYSTEM_UNLOCK(env);
+ }
+
if ((ret = __rep_set_transport_int(env,
db_rep->self_eid, __repmgr_send)) != 0)
return (ret);
@@ -128,7 +190,8 @@ __repmgr_start(dbenv, nthreads, flags)
if (db_rep->restored_list != NULL) {
ret = __repmgr_refresh_membership(env,
- db_rep->restored_list, db_rep->restored_list_length);
+ db_rep->restored_list, db_rep->restored_list_length,
+ DB_REPMGR_VERSION);
__os_free(env, db_rep->restored_list);
db_rep->restored_list = NULL;
} else {
@@ -145,9 +208,15 @@ __repmgr_start(dbenv, nthreads, flags)
* join.
*/
ret = __repmgr_join_group(env);
+ else if (VIEW_TO_PARTICIPANT(db_rep, me)) {
+ __db_errx(env, DB_STR("3695",
+ "A view site must be started with a view callback"));
+ return (EINVAL);
+ }
} else if (ret == ENOENT) {
- ENV_ENTER(env, ip);
- if (FLD_ISSET(me->config, DB_GROUP_CREATOR))
+ if (FLD_ISSET(me->config, DB_GROUP_CREATOR) ||
+ (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)))
start_master = TRUE;
/*
* LEGACY is inconsistent with CREATOR, but start_master
@@ -166,10 +235,12 @@ __repmgr_start(dbenv, nthreads, flags)
continue;
if ((ret = __repmgr_set_membership(env,
site->net_addr.host,
- site->net_addr.port,
- SITE_PRESENT)) != 0)
+ site->net_addr.port, SITE_PRESENT,
+ site->gmdb_flags)) != 0)
break;
- n++;
+ if (!FLD_ISSET(site->gmdb_flags,
+ SITE_VIEW))
+ n++;
}
ret = __rep_set_nsites_int(env, n);
DB_ASSERT(env, ret == 0);
@@ -180,30 +251,27 @@ __repmgr_start(dbenv, nthreads, flags)
db_rep->member_version_gen = 1;
if ((ret = __repmgr_set_membership(env,
me->net_addr.host, me->net_addr.port,
- SITE_PRESENT)) == 0) {
+ SITE_PRESENT, 0)) == 0) {
ret = __rep_set_nsites_int(env, 1);
DB_ASSERT(env, ret == 0);
}
UNLOCK_MUTEX(db_rep->mutex);
} else
ret = __repmgr_join_group(env);
- ENV_LEAVE(env, ip);
} else if (ret == DB_DELETED)
ret = DB_REP_UNAVAIL;
}
if (ret != 0)
return (ret);
- DB_ASSERT(env, start_master ||
- SITE_FROM_EID(db_rep->self_eid)->membership == SITE_PRESENT);
-
/*
- * If we're the first repmgr_start() call, we will have to start threads.
- * Therefore, we require a flags value (to tell us how).
+ * Catch case where user defines a different local site address than
+ * the one in the restored_list from an ongoing internal init.
*/
- if (db_rep->repmgr_status != running && flags == 0) {
- __db_errx(env, DB_STR("3639",
- "a non-zero flags value is required for initial repmgr_start() call"));
+ if (!start_master &&
+ SITE_FROM_EID(db_rep->self_eid)->membership != SITE_PRESENT) {
+ __db_errx(env, DB_STR("3696",
+ "Current local site conflicts with earlier definition"));
return (EINVAL);
}
@@ -214,37 +282,54 @@ __repmgr_start(dbenv, nthreads, flags)
*
* Then, in case there could be multiple processes, we're either the
* main listener process or a subordinate process. On a "subsequent"
- * repmgr_start() call we already have enough information to know which
- * it is. Otherwise, negotiate with information in the shared region to
- * claim the listener role if possible.
+ * repmgr_start() call, with a running main listener process, we already
+ * have enough information to know which it is. Otherwise, if there is
+ * no listener, negotiate with information in the shared region to claim
+ * the listener role if possible. Once we decide we're the listener,
+ * mark the listener id in the shared region, so that no other process
+ * thinks the same thing.
*
* To avoid a race, once we decide we're in the first call, mark the
* handle as started, so that no other thread thinks the same thing.
*/
+ first = FALSE;
+ is_listener = FALSE;
LOCK_MUTEX(db_rep->mutex);
locked = TRUE;
- if (db_rep->repmgr_status == running) {
- first = FALSE;
+ if (db_rep->repmgr_status == running && !(rep->listener == 0 &&
+ FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER)))
is_listener = !IS_SUBORDINATE(db_rep);
- } else {
+ else if (db_rep->repmgr_status != running &&
+ rep->listener == 0 && flags == 0)
+ flags_error = 1;
+ else {
first = TRUE;
db_rep->repmgr_status = running;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
if (rep->listener == 0) {
is_listener = TRUE;
- __os_id(dbenv, &rep->listener, NULL);
- } else {
- is_listener = FALSE;
+ __os_id(env->dbenv, &rep->listener, NULL);
+ } else
nthreads = 0;
- }
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
}
UNLOCK_MUTEX(db_rep->mutex);
locked = FALSE;
+ /*
+ * The first repmgr_start() call for the main listener process
+ * requires a flags value to tell us how to start up the site.
+ * But we don't require a flags value for the repmgr_start()
+ * call for a subordinate process because the site is already
+ * started and we would only ignore the value anyway.
+ */
+ if (flags_error) {
+ __db_errx(env, DB_STR("3639",
+ "A non-zero flags value is required for initial repmgr_start() call"));
+ return (EINVAL);
+ }
+
if (!first) {
/*
* Subsequent call is allowed when ELECTIONS are turned off, so
@@ -266,7 +351,7 @@ __repmgr_start(dbenv, nthreads, flags)
/*
* The minimum legal number of threads is either 1 or 0, depending upon
- * whether we're the main process or a subordinate.
+ * whether we're the listener process or a subordinate.
*/
min = is_listener ? 1 : 0;
if (nthreads < min) {
@@ -303,14 +388,24 @@ __repmgr_start(dbenv, nthreads, flags)
* of rep_start calls even within an env region lifetime.
*/
if (start_master) {
- ret = __repmgr_become_master(env);
+ ret = __repmgr_become_master(env, 0);
/* No other repmgr threads running yet. */
DB_ASSERT(env, ret != DB_REP_UNAVAIL);
if (ret != 0)
goto err;
need_masterseek = FALSE;
} else {
- if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
+ /*
+ * The preferred master site cannot allow its gen
+ * to change until it has done its lsnhist_match to
+ * guarantee that no preferred master transactions
+ * will be rolled back.
+ */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER))
+ startopts = REP_START_HOLD_CLIGEN;
+ if ((ret = __repmgr_repstart(env,
+ DB_REP_CLIENT, startopts)) != 0)
goto err;
/*
* The repmgr election code starts elections only if
@@ -352,6 +447,7 @@ __repmgr_start(dbenv, nthreads, flags)
if ((ret =
__repmgr_start_msg_threads(env, (u_int)nthreads)) != 0)
goto err;
+ rep->listener_nthreads = (u_int)nthreads;
if (need_masterseek) {
/*
@@ -374,10 +470,47 @@ __repmgr_start(dbenv, nthreads, flags)
}
UNLOCK_MUTEX(db_rep->mutex);
locked = FALSE;
+ /*
+ * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only
+ * do this for the main listener process. For a subordinate
+ * process, it is always turned on.
+ */
+ rep->inqueue_full_event_on = 1;
+ }
+ if (db_rep->selector == NULL) {
+ /* All processes (even non-listeners) need a select() thread. */
+ if ((ret = __repmgr_start_selector(env)) == 0) {
+ /*
+ * A view callback is set but this site isn't yet a
+ * view in the internal site list. Do the view
+ * demotion here, which will update the internal
+ * site list. We need the select() thread for the
+ * demotion because the demotion performs gmdb
+ * operations.
+ */
+ if (PARTICIPANT_TO_VIEW(db_rep,
+ SITE_FROM_EID(db_rep->self_eid)) &&
+ (ret = __repmgr_demote_site(env,
+ db_rep->self_eid)) != 0)
+ goto err;
+ return (is_listener ? 0 : DB_REP_IGNORE);
+ }
+ } else {
+ /*
+ * If the selector thread already exists, the current process
+ * should be the new listener which has just finished a
+ * takeover. Now, all active connections need to be refreshed
+ * to notify remote sites about the new listener. If a new
+ * connection is established immediately, disable the existing
+ * main connection to the same site. Otherwise, schedule a
+ * second immediate attempt. If it still fails, disable the
+ * main connection and retry a connection as usual.
+ */
+ DB_ASSERT(env, is_listener &&
+ FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER));
+ if ((ret = __repmgr_refresh_selector(env)) == 0)
+ return (0);
}
- /* All processes (even non-listeners) need a select() thread. */
- if ((ret = __repmgr_start_selector(env)) == 0)
- return (is_listener ? 0 : DB_REP_IGNORE);
err:
/* If we couldn't succeed at everything, undo the parts we did do. */
@@ -392,6 +525,16 @@ err:
if (!locked)
LOCK_MUTEX(db_rep->mutex);
(void)__repmgr_net_close(env);
+ /* Reset the listener when we fail before having a valid listen_fd. */
+ if (first && is_listener)
+ rep->listener = 0;
+ /*
+ * Reset repmgr_status when we fail before starting a selector if the
+ * earlier call to __repmgr_stop_threads() hasn't already reset it to
+ * stopped.
+ */
+ if (db_rep->repmgr_status == running)
+ db_rep->repmgr_status = ready;
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}
@@ -425,6 +568,53 @@ __repmgr_valid_config(env, flags)
}
/*
+ * Set priority, heartbeat and election_retry timeouts for preferred master
+ * mode. Turn on 2SITE_STRICT and ELECTIONS. Can be called whether or not
+ * REP_ON() is true
+ *
+ * PUBLIC: int __repmgr_prefmas_auto_config __P((DB_ENV *, u_int32_t *));
+ */
+int __repmgr_prefmas_auto_config (dbenv, config_flags)
+ DB_ENV *dbenv;
+ u_int32_t *config_flags;
+{
+ ENV * env;
+ db_timeout_t timeout;
+ int ret;
+
+ env = dbenv->env;
+ timeout = 0;
+
+ /* Change heartbeat timeouts if they are not already set. */
+ if ((ret = __rep_get_timeout(dbenv,
+ DB_REP_HEARTBEAT_MONITOR, &timeout)) == 0 &&
+ timeout == 0 && (ret = __rep_set_timeout_int(env,
+ DB_REP_HEARTBEAT_MONITOR,
+ DB_REPMGR_PREFMAS_HEARTBEAT_MONITOR)) != 0)
+ return (ret);
+ if ((ret = __rep_get_timeout(dbenv,
+ DB_REP_HEARTBEAT_SEND, &timeout)) == 0 &&
+ timeout == 0 && (ret = __rep_set_timeout_int(env,
+ DB_REP_HEARTBEAT_SEND, DB_REPMGR_PREFMAS_HEARTBEAT_SEND)) != 0)
+ return (ret);
+
+ /* Change election_retry timeout if it is still the default value. */
+ if ((ret = __rep_get_timeout(dbenv,
+ DB_REP_ELECTION_RETRY, &timeout)) == 0 &&
+ timeout == DB_REPMGR_DEFAULT_ELECTION_RETRY &&
+ (ret = __rep_set_timeout_int(env,
+ DB_REP_ELECTION_RETRY, DB_REPMGR_PREFMAS_ELECTION_RETRY)) != 0)
+ return (ret);
+
+ if ((ret = __rep_set_priority_int(env, FLD_ISSET(*config_flags,
+ REP_C_PREFMAS_MASTER) ? DB_REPMGR_PREFMAS_PRIORITY_MASTER :
+ DB_REPMGR_PREFMAS_PRIORITY_CLIENT)) != 0)
+ return (ret);
+ FLD_SET(*config_flags, REP_C_ELECTIONS | REP_C_2SITE_STRICT);
+ return (0);
+}
+
+/*
* Starts message processing threads. On entry, the actual number of threads
* already active is db_rep->nthreads; the desired number of threads is passed
* as "n".
@@ -473,7 +663,7 @@ __repmgr_restart(env, nthreads, flags)
REP *rep;
REPMGR_RUNNABLE **th;
u_int32_t cur_repflags;
- int locked, ret, t_ret;
+ int locked, ret, role_change, t_ret;
u_int delta, i, min, nth;
th = NULL;
@@ -491,6 +681,7 @@ __repmgr_restart(env, nthreads, flags)
}
ret = 0;
+ role_change = 0;
db_rep = env->rep_handle;
DB_ASSERT(env, REP_ON(env));
rep = db_rep->region;
@@ -498,11 +689,14 @@ __repmgr_restart(env, nthreads, flags)
cur_repflags = F_ISSET(rep, REP_F_MASTER | REP_F_CLIENT);
DB_ASSERT(env, cur_repflags);
if (FLD_ISSET(cur_repflags, REP_F_MASTER) &&
- flags == DB_REP_CLIENT)
+ flags == DB_REP_CLIENT) {
ret = __repmgr_become_client(env);
- else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) &&
- flags == DB_REP_MASTER)
- ret = __repmgr_become_master(env);
+ role_change = 1;
+ } else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) &&
+ flags == DB_REP_MASTER) {
+ ret = __repmgr_become_master(env, 0);
+ role_change = 1;
+ }
if (ret != 0)
return (ret);
@@ -574,6 +768,9 @@ __repmgr_restart(env, nthreads, flags)
}
__os_free(env, th);
}
+ /* We will always turn on the inqueue full event after role change. */
+ if (role_change)
+ rep->inqueue_full_event_on = 1;
out: if (locked)
UNLOCK_MUTEX(db_rep->mutex);
@@ -668,7 +865,8 @@ __repmgr_start_selector(env)
* PUBLIC: int __repmgr_close __P((ENV *));
*
* Close repmgr during env close. It stops repmgr, frees sites array and
- * its addresses.
+ * its addresses. Note that it is possible for the sites array to exist
+ * and require deallocation independently of whether repmgr was started.
*/
int
__repmgr_close(env)
@@ -679,10 +877,15 @@ __repmgr_close(env)
int ret;
u_int i;
- db_rep = env->rep_handle;
+ if ((db_rep = env->rep_handle) == NULL)
+ return (0);
ret = 0;
- ret = __repmgr_stop(env);
+ /* Stop repmgr and all of its threads if it was previously started. */
+ if (IS_ENV_REPLICATED(env))
+ ret = __repmgr_stop(env);
+
+ /* Clean up sites array regardless of whether we could stop repmgr. */
if (db_rep->sites != NULL) {
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
@@ -756,9 +959,9 @@ __repmgr_set_ack_policy(dbenv, policy)
DB_ENV *dbenv;
int policy;
{
+ ENV *env;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
- ENV *env;
REP *rep;
int ret;
@@ -823,6 +1026,208 @@ __repmgr_get_ack_policy(dbenv, policy)
}
/*
+ * PUBLIC: int __repmgr_set_incoming_queue_max __P((DB_ENV *, u_int32_t,
+ * PUBLIC: u_int32_t));
+ *
+ * Sets the maximum amount of dynamic memory used by the Replication Manager
+ * incoming queue.
+ */
+int
+__repmgr_set_incoming_queue_max(dbenv, gbytes, bytes)
+ DB_ENV *dbenv;
+ u_int32_t gbytes, bytes;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->repmgr_set_incoming_queue_max",
+ DB_INIT_REP);
+
+ if (APP_IS_BASEAPI(env)) {
+ __db_errx(env, "%s %s",
+ "DB_ENV->repmgr_set_incoming_queue_max:",
+ "cannot call from base replication application");
+ return (EINVAL);
+ }
+
+ /*
+ * If the caller provided 0 for the size, the size will be unlimited.
+ */
+ if (gbytes == 0 && bytes == 0) {
+ gbytes = UINT32_MAX;
+ bytes = GIGABYTE - 1;
+ }
+
+ while (bytes >= GIGABYTE) {
+ bytes -= GIGABYTE;
+ if (gbytes < UINT32_MAX)
+ gbytes++;
+ }
+
+ if (REP_ON(env)) {
+ ENV_ENTER(env, ip);
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ rep->inqueue_max_gbytes = gbytes;
+ rep->inqueue_max_bytes = bytes;
+ __repmgr_set_incoming_queue_redzone(rep, gbytes, bytes);
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ ENV_LEAVE(env, ip);
+ } else {
+ db_rep->inqueue_max_gbytes = gbytes;
+ db_rep->inqueue_max_bytes = bytes;
+ }
+
+ /*
+ * Setting incoming queue maximum sizes makes this a replication
+ * manager application.
+ */
+ APP_SET_REPMGR(env);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_get_incoming_queue_max __P((DB_ENV *, u_int32_t *,
+ * PUBLIC: u_int32_t *));
+ *
+ * Gets the maximum amount of dynamic memory that can be used by the
+ * Replicaton Manager incoming queue.
+ */
+int
+__repmgr_get_incoming_queue_max(dbenv, gbytesp, bytesp)
+ DB_ENV *dbenv;
+ u_int32_t *gbytesp, *bytesp;
+{
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ DB_REP *db_rep;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ if (REP_ON(env)) {
+ ENV_ENTER(env, ip);
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ *gbytesp = rep->inqueue_max_gbytes;
+ *bytesp = rep->inqueue_max_bytes;
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ ENV_LEAVE(env, ip);
+ } else {
+ *gbytesp = db_rep->inqueue_max_gbytes;
+ *bytesp = db_rep->inqueue_max_bytes;
+ }
+
+ return (0);
+}
+
+/*
+ * PUBLIC: void __repmgr_set_incoming_queue_redzone __P((void *, u_int32_t,
+ * PUBLIC: u_int32_t));
+ *
+ * Sets the lower bound of the repmgr incoming queue red zone.
+ * !!! Assumes caller holds mtx_repmgr lock.
+ *
+ * Note that we can't simply get the REP* address from the env as we usually do,
+ * because at the time of this call it may not have been linked into there yet.
+ * Also note that, REP is not a public structure, so we use "void *" here.
+ */
+void __repmgr_set_incoming_queue_redzone(rep_, gbytes, bytes)
+ void *rep_;
+ u_int32_t gbytes, bytes;
+{
+ REP *rep;
+ double rdgbytes, rdbytes;
+
+ rep = rep_;
+
+ /*
+ * We use 'double' values to do the computation for precision, and
+ * to avoid overflow.
+ */
+ rdgbytes = gbytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00;
+ rdbytes = (rdgbytes - (u_int32_t)rdgbytes) * GIGABYTE;
+ rdbytes += bytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00;
+ if (rdbytes >= GIGABYTE) {
+ rdgbytes += 1;
+ rdbytes -= GIGABYTE;
+ }
+ rep->inqueue_rz_gbytes = (u_int32_t)rdgbytes;
+ rep->inqueue_rz_bytes = (u_int32_t)rdbytes;
+}
+
+/*
+ * PUBLIC: int __repmgr_get_incoming_queue_redzone __P((DB_ENV *,
+ * PUBLIC: u_int32_t *, u_int32_t *));
+ *
+ * Gets the lower bound of the repmgr incoming queue red zone.
+ * This method must be called after environment open.
+ */
+int __repmgr_get_incoming_queue_redzone(dbenv, gbytesp, bytesp)
+ DB_ENV *dbenv;
+ u_int32_t *gbytesp, *bytesp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_REQUIRES_CONFIG(
+ env, db_rep->region, "__repmgr_get_incoming_queue_redzone",
+ DB_INIT_REP);
+
+ ENV_ENTER(env, ip);
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ *gbytesp = rep->inqueue_rz_gbytes;
+ *bytesp = rep->inqueue_rz_bytes;
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ ENV_LEAVE(env, ip);
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_get_incoming_queue_fullevent __P((DB_ENV *,
+ * PUBLIC: int *));
+ *
+ * Return whether the DB_EVENT_REP_INQUEUE_FULL event firing is
+ * turned on or off.
+ * This method must be called after environment open.
+ */
+int __repmgr_get_incoming_queue_fullevent(dbenv, onoffp)
+ DB_ENV *dbenv;
+ int *onoffp;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_REQUIRES_CONFIG(
+ env, db_rep->region,
+ "DB_ENV->__repmgr_get_incoming_queue_fullevent",
+ DB_INIT_REP);
+
+ *onoffp = rep->inqueue_full_event_on ? 1 : 0;
+
+ return (0);
+}
+
+/*
* PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *));
*/
int
@@ -837,7 +1242,13 @@ __repmgr_env_create(env, db_rep)
db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
db_rep->config_nsites = 0;
+ ADJUST_AUTOTAKEOVER_WAITS(db_rep, DB_REPMGR_DEFAULT_ACK_TIMEOUT);
db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+ db_rep->inqueue_max_gbytes = 0;
+ db_rep->inqueue_max_bytes = 0;
+#ifdef HAVE_REPLICATION_LISTENER_TAKEOVER
+ FLD_SET(db_rep->config, REP_C_AUTOTAKEOVER);
+#endif
FLD_SET(db_rep->config, REP_C_ELECTIONS);
FLD_SET(db_rep->config, REP_C_2SITE_STRICT);
@@ -846,7 +1257,8 @@ __repmgr_env_create(env, db_rep)
TAILQ_INIT(&db_rep->connections);
TAILQ_INIT(&db_rep->retries);
- db_rep->input_queue.size = 0;
+ db_rep->input_queue.gbytes = 0;
+ db_rep->input_queue.bytes = 0;
STAILQ_INIT(&db_rep->input_queue.header);
__repmgr_env_create_pf(db_rep);
@@ -944,6 +1356,15 @@ __repmgr_await_threads(env)
* of a connector thread.
*/
+ /* Takeover thread. */
+ if (db_rep->takeover_thread != NULL) {
+ if ((t_ret = __repmgr_thread_join(db_rep->takeover_thread)) !=
+ 0 && ret == 0)
+ ret = t_ret;
+ __os_free(env, db_rep->takeover_thread);
+ db_rep->takeover_thread = NULL;
+ }
+
/* Message processing threads. */
for (i = 0;
i < db_rep->nthreads && db_rep->messengers[i] != NULL; i++) {
@@ -1178,7 +1599,7 @@ get_shared_netaddr(env, eid, netaddr)
MUTEX_LOCK(env, rep->mtx_repmgr);
if ((u_int)eid >= rep->site_cnt) {
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
}
DB_ASSERT(env, rep->siteinfo_off != INVALID_ROFF);
@@ -1423,7 +1844,7 @@ send_msg_self(env, iovecs, nmsg)
u_int32_t nmsg;
{
REPMGR_MESSAGE *msg;
- size_t align, bodysize, structsize;
+ size_t align, bodysize, msgsize, structsize;
u_int8_t *membase;
int ret;
@@ -1431,10 +1852,12 @@ send_msg_self(env, iovecs, nmsg)
bodysize = iovecs->total_bytes - __REPMGR_MSG_HDR_SIZE;
structsize = (size_t)DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
nmsg * sizeof(DBT)), align);
- if ((ret = __os_malloc(env, structsize + bodysize, &membase)) != 0)
+ msgsize = structsize + bodysize;
+ if ((ret = __os_malloc(env, msgsize, &membase)) != 0)
return (ret);
msg = (void*)membase;
+ msg->size = msgsize;
membase += structsize;
/*
@@ -1616,13 +2039,14 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
}
ENV_ENTER(env, ip);
- ret = get_channel_connection(channel, &conn);
- ENV_LEAVE(env, ip);
- if (ret != 0)
- return (ret);
+ if ((ret = get_channel_connection(channel, &conn)) != 0)
+ goto out;
- if (conn == NULL)
- return (request_self(env, request, nrequest, response, flags));
+ /* If conn is NULL, call request_self and then we are done here. */
+ if (conn == NULL) {
+ ret = request_self(env, request, nrequest, response, flags);
+ goto out;
+ }
/* Find an available array slot, or grow the array if necessary. */
LOCK_MUTEX(db_rep->mutex);
@@ -1670,7 +2094,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
LOCK_MUTEX(db_rep->mutex);
F_CLR(&conn->responses[i], RESP_IN_USE | RESP_THREAD_WAITING);
UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
+ goto out;
}
timeout = timeout > 0 ? timeout : db_channel->timeout;
@@ -1688,7 +2112,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
* to wake up those threads, with a COMPLETE indication and an
* error code. That's more than we want to tackle here.
*/
- return (ret);
+ goto out;
}
/*
@@ -1732,7 +2156,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
sz = conn->iovecs.vectors[0].iov_len;
if ((ret = __os_malloc(env, sz, &dummy)) != 0)
- goto out;
+ goto out_unlck;
__repmgr_iovec_init(&conn->iovecs);
DB_INIT_DBT(resp->dbt, dummy, sz);
__repmgr_add_dbt(&conn->iovecs, &resp->dbt);
@@ -1740,8 +2164,9 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
}
}
-out:
+out_unlck:
UNLOCK_MUTEX(db_rep->mutex);
+out: ENV_LEAVE(env, ip);
return (ret);
}
@@ -2168,6 +2593,7 @@ __repmgr_channel_close(dbchan, flags)
{
ENV *env;
DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
REPMGR_CONNECTION *conn;
CHANNEL *channel;
u_int32_t i;
@@ -2182,6 +2608,7 @@ __repmgr_channel_close(dbchan, flags)
* Disable connection(s) (if not already done due to an error having
* occurred previously); release our reference to conn struct(s).
*/
+ ENV_ENTER(env, ip);
LOCK_MUTEX(db_rep->mutex);
if (dbchan->eid >= 0) {
conn = channel->c.conn;
@@ -2218,6 +2645,7 @@ __repmgr_channel_close(dbchan, flags)
__os_free(env, channel);
__os_free(env, dbchan);
+ ENV_LEAVE(env, ip);
return (ret);
}
@@ -2369,29 +2797,26 @@ join_group_at_site(env, addrp)
repmgr_netaddr_t *addrp;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_CONNECTION *conn;
SITE_STRING_BUFFER addr_buf;
repmgr_netaddr_t addr, myaddr;
__repmgr_gm_fwd_args fwd;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *p, *response_buf, siteinfo_buf[MAX_MSG_BUF];
char host_buf[MAXHOSTNAMELEN + 1], *host;
u_int32_t gen, type;
- size_t len;
+ size_t host_len, msg_len, req_len;
int ret, t_ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
myaddr = SITE_FROM_EID(db_rep->self_eid)->net_addr;
UNLOCK_MUTEX(db_rep->mutex);
- len = strlen(myaddr.host) + 1;
- DB_INIT_DBT(site_info.host, myaddr.host, len);
- site_info.port = myaddr.port;
- site_info.flags = 0;
- ret = __repmgr_site_info_marshal(env,
- &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
- DB_ASSERT(env, ret == 0);
+ host_len = strlen(myaddr.host) + 1;
conn = NULL;
response_buf = NULL;
@@ -2399,14 +2824,35 @@ join_group_at_site(env, addrp)
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "try join request to site %s",
__repmgr_format_addr_loc(addrp, addr_buf)));
retry:
- if ((ret = make_request_conn(env, addrp, &conn)) != 0)
+ if ((ret = __repmgr_make_request_conn(env, addrp, &conn)) != 0)
return (ret);
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ if (conn->version < 5) {
+ DB_INIT_DBT(v4site_info.host, myaddr.host, host_len);
+ v4site_info.port = myaddr.port;
+ v4site_info.flags = 0;
+ ret = __repmgr_v4site_info_marshal(env,
+ &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len);
+ } else {
+ DB_INIT_DBT(site_info.host, myaddr.host, host_len);
+ site_info.port = myaddr.port;
+ site_info.status = 0;
+ site_info.flags = 0;
+ if (IS_VIEW_SITE(env))
+ FLD_SET(site_info.flags, SITE_VIEW);
+ if (rep->priority > 0)
+ FLD_SET(site_info.flags, SITE_JOIN_ELECTABLE);
+ ret = __repmgr_site_info_marshal(env,
+ &site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len);
+ }
+ DB_ASSERT(env, ret == 0);
+ /* Preserve separate request length in case there is a retry. */
if ((ret = __repmgr_send_sync_msg(env, conn,
- REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0)
+ REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)req_len)) != 0)
goto err;
- if ((ret = read_own_msg(env,
- conn, &type, &response_buf, &len)) != 0)
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &msg_len)) != 0)
goto err;
if (type == REPMGR_GM_FAILURE) {
@@ -2429,7 +2875,7 @@ retry:
goto err;
ret = __repmgr_gm_fwd_unmarshal(env, &fwd,
- response_buf, len, &p);
+ response_buf, msg_len, &p);
DB_ASSERT(env, ret == 0);
if (fwd.gen > gen) {
if (fwd.host.size > MAXHOSTNAMELEN + 1) {
@@ -2456,7 +2902,8 @@ retry:
}
}
if (type == REPMGR_JOIN_SUCCESS)
- ret = __repmgr_refresh_membership(env, response_buf, len);
+ ret = __repmgr_refresh_membership(env, response_buf, msg_len,
+ conn->version);
else
ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */
@@ -2476,129 +2923,6 @@ err:
}
/*
- * Reads a whole message, when we expect to get a REPMGR_OWN_MSG.
- */
-static int
-read_own_msg(env, conn, typep, bufp, lenp)
- ENV *env;
- REPMGR_CONNECTION *conn;
- u_int32_t *typep;
- u_int8_t **bufp;
- size_t *lenp;
-{
- __repmgr_msg_hdr_args msg_hdr;
- u_int8_t *buf;
- u_int32_t type;
- size_t size;
- int ret;
-
- __repmgr_reset_for_reading(conn);
- if ((ret = __repmgr_read_conn(conn)) != 0)
- goto err;
- ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
- conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
- DB_ASSERT(env, ret == 0);
-
- if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) {
- ret = DB_REP_UNAVAIL; /* Protocol violation. */
- goto err;
- }
- type = REPMGR_OWN_MSG_TYPE(msg_hdr);
- if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) {
- conn->reading_phase = DATA_PHASE;
- __repmgr_iovec_init(&conn->iovecs);
-
- if ((ret = __os_malloc(env, size, &buf)) != 0)
- goto err;
- conn->input.rep_message = NULL;
-
- __repmgr_add_buffer(&conn->iovecs, buf, size);
- if ((ret = __repmgr_read_conn(conn)) != 0) {
- __os_free(env, buf);
- goto err;
- }
- *bufp = buf;
- }
-
- *typep = type;
- *lenp = size;
-
-err:
- return (ret);
-}
-
-static int
-make_request_conn(env, addr, connp)
- ENV *env;
- repmgr_netaddr_t *addr;
- REPMGR_CONNECTION **connp;
-{
- DBT vi;
- __repmgr_msg_hdr_args msg_hdr;
- __repmgr_version_confirmation_args conf;
- REPMGR_CONNECTION *conn;
- int alloc, ret, unused;
-
- alloc = FALSE;
- if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0)
- return (ret);
- conn->type = APP_CONNECTION;
-
- /* Read a handshake msg, to get version confirmation and parameters. */
- if ((ret = __repmgr_read_conn(conn)) != 0)
- goto err;
- /*
- * We can only get here after having read the full 9 bytes that we
- * expect, so this can't fail.
- */
- DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
- ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
- conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
- DB_ASSERT(env, ret == 0);
- __repmgr_iovec_init(&conn->iovecs);
- conn->reading_phase = DATA_PHASE;
-
- if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0)
- goto err;
- alloc = TRUE;
-
- if ((ret = __repmgr_read_conn(conn)) != 0)
- goto err;
-
- /*
- * Analyze the handshake msg, and stash relevant info.
- */
- if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
- goto err;
- DB_ASSERT(env, vi.size > 0);
- if ((ret = __repmgr_version_confirmation_unmarshal(env,
- &conf, vi.data, vi.size, NULL)) != 0)
- goto err;
-
- if (conf.version < GM_MIN_VERSION) {
- ret = DB_REP_UNAVAIL;
- goto err;
- }
- conn->version = conf.version;
-
-err:
- if (alloc) {
- DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0);
- __os_free(env, conn->input.repmgr_msg.cntrl.data);
- DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0);
- __os_free(env, conn->input.repmgr_msg.rec.data);
- }
- __repmgr_reset_for_reading(conn);
- if (ret == 0)
- *connp = conn;
- else {
- (void)__repmgr_close_connection(env, conn);
- (void)__repmgr_destroy_conn(env, conn);
- }
- return (ret);
-}
-
-/*
* PUBLIC: int __repmgr_site __P((DB_ENV *,
* PUBLIC: const char *, u_int, DB_SITE **, u_int32_t));
*/
@@ -2640,9 +2964,9 @@ site_by_addr(env, host, port, sitep)
if ((ret = addr_chk(env, host, port)) != 0)
return (ret);
+ ENV_ENTER(env, ip);
if (REP_ON(env)) {
LOCK_MUTEX(db_rep->mutex);
- ENV_ENTER(env, ip);
locked = TRUE;
} else
locked = FALSE;
@@ -2654,10 +2978,9 @@ site_by_addr(env, host, port, sitep)
* we want the DB_SITE handle to point to; just like site_by_eid() does.
*/
host = site->net_addr.host;
- if (locked) {
- ENV_LEAVE(env, ip);
+ if (locked)
UNLOCK_MUTEX(db_rep->mutex);
- }
+ ENV_LEAVE(env, ip);
if (ret != 0)
return (ret);
@@ -2723,7 +3046,7 @@ init_dbsite(env, eid, host, port, sitep)
dbsite->get_address = __repmgr_get_site_address;
dbsite->get_config = __repmgr_get_config;
dbsite->get_eid = __repmgr_get_eid;
- dbsite->set_config = __repmgr_site_config;
+ dbsite->set_config = __repmgr_site_config_pp;
dbsite->remove = __repmgr_remove_site_pp;
dbsite->close = __repmgr_site_close;
@@ -2756,9 +3079,16 @@ __repmgr_get_eid(dbsite, eidp)
DB_SITE *dbsite;
int *eidp;
{
+ DB_THREAD_INFO *ip;
+ ENV *env;
int ret;
- if ((ret = refresh_site(dbsite)) != 0)
+ env = dbsite->env;
+
+ ENV_ENTER(env, ip);
+ ret = refresh_site(dbsite);
+ ENV_LEAVE(env, ip);
+ if (ret != 0)
return (ret);
if (F_ISSET(dbsite, DB_SITE_PREOPEN)) {
@@ -2791,8 +3121,11 @@ __repmgr_get_config(dbsite, which, valuep)
env = dbsite->env;
db_rep = env->rep_handle;
- if ((ret = refresh_site(dbsite)) != 0)
+ ENV_ENTER(env, ip);
+ if ((ret = refresh_site(dbsite)) != 0) {
+ ENV_LEAVE(env, ip);
return (ret);
+ }
LOCK_MUTEX(db_rep->mutex);
DB_ASSERT(env, IS_VALID_EID(dbsite->eid));
site = SITE_FROM_EID(dbsite->eid);
@@ -2800,32 +3133,52 @@ __repmgr_get_config(dbsite, which, valuep)
rep = db_rep->region;
infop = env->reginfo;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
sites = R_ADDR(infop, rep->siteinfo_off);
site->config = sites[dbsite->eid].config;
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
}
*valuep = FLD_ISSET(site->config, which) ? 1 : 0;
UNLOCK_MUTEX(db_rep->mutex);
+ ENV_LEAVE(env, ip);
return (0);
}
/*
- * PUBLIC: int __repmgr_site_config __P((DB_SITE *, u_int32_t, u_int32_t));
+ * PUBLIC: int __repmgr_site_config_pp __P((DB_SITE *, u_int32_t, u_int32_t));
*/
int
-__repmgr_site_config(dbsite, which, value)
+__repmgr_site_config_pp(dbsite, which, value)
DB_SITE *dbsite;
u_int32_t which;
u_int32_t value;
{
- DB_REP *db_rep;
DB_THREAD_INFO *ip;
ENV *env;
+ int ret;
+
+ env = dbsite->env;
+
+ ENV_ENTER(env, ip);
+ ret = __repmgr_site_config_int(dbsite, which, value);
+ ENV_LEAVE(env, ip);
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_site_config_int __P((DB_SITE *, u_int32_t, u_int32_t));
+ */
+int
+__repmgr_site_config_int(dbsite, which, value)
+ DB_SITE *dbsite;
+ u_int32_t which;
+ u_int32_t value;
+{
+ DB_REP *db_rep;
+ ENV *env;
REGINFO *infop;
REP *rep;
REPMGR_SITE *site;
@@ -2875,7 +3228,6 @@ __repmgr_site_config(dbsite, which, value)
infop = env->reginfo;
LOCK_MUTEX(db_rep->mutex);
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
sites = R_ADDR(infop, rep->siteinfo_off);
site = SITE_FROM_EID(dbsite->eid);
@@ -2896,7 +3248,6 @@ __repmgr_site_config(dbsite, which, value)
rep->siteinfo_seq++;
}
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
UNLOCK_MUTEX(db_rep->mutex);
} else {
site = SITE_FROM_EID(dbsite->eid);
@@ -2930,7 +3281,6 @@ set_local_site(dbsite, value)
if (REP_ON(env)) {
rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
locked = TRUE;
/* Make sure we're in sync first. */
@@ -2941,31 +3291,32 @@ set_local_site(dbsite, value)
__db_errx(env, DB_STR("3666",
"A previously given local site may not be unset"));
ret = EINVAL;
- } else if (IS_VALID_EID(db_rep->self_eid) &&
- db_rep->self_eid != dbsite->eid) {
- __db_errx(env, DB_STR("3667",
- "A (different) local site has already been set"));
- ret = EINVAL;
- } else {
- DB_ASSERT(env, IS_VALID_EID(dbsite->eid));
- site = SITE_FROM_EID(dbsite->eid);
- if (FLD_ISSET(site->config,
- DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) {
- __db_errx(env, DB_STR("3668",
- "Local site cannot have HELPER or PEER attributes"));
+ } else if (value) {
+ if (IS_VALID_EID(db_rep->self_eid) &&
+ db_rep->self_eid != dbsite->eid) {
+ __db_errx(env, DB_STR("3697",
+ "A (different) local site has already been set"));
ret = EINVAL;
+ } else {
+ DB_ASSERT(env, IS_VALID_EID(dbsite->eid));
+ site = SITE_FROM_EID(dbsite->eid);
+ if (FLD_ISSET(site->config,
+ DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) {
+ __db_errx(env, DB_STR("3698",
+ "Local site cannot have HELPER or PEER attributes"));
+ ret = EINVAL;
+ }
}
}
- if (ret == 0) {
+ if (ret == 0 && value) {
db_rep->self_eid = dbsite->eid;
if (locked) {
- rep->self_eid = dbsite->eid;
+ rep->self_eid = db_rep->self_eid;
rep->siteinfo_seq++;
}
}
if (locked) {
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
UNLOCK_MUTEX(db_rep->mutex);
}
return (ret);
@@ -2998,7 +3349,7 @@ refresh_site(dbsite)
}
static int
-__repmgr_remove_site_pp(dbsite)
+__repmgr_remove_and_close_site(dbsite)
DB_SITE *dbsite;
{
int ret, t_ret;
@@ -3011,6 +3362,23 @@ __repmgr_remove_site_pp(dbsite)
*/
if ((t_ret = __repmgr_site_close(dbsite)) != 0 && ret == 0)
ret = t_ret;
+
+ return (ret);
+}
+
+static int
+__repmgr_remove_site_pp(dbsite)
+ DB_SITE *dbsite;
+{
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ int ret;
+
+ env = dbsite->env;
+
+ ENV_ENTER(env, ip);
+ ret = __repmgr_remove_and_close_site(dbsite);
+ ENV_LEAVE(env, ip);
return (ret);
}
@@ -3024,6 +3392,7 @@ __repmgr_remove_site(dbsite)
REPMGR_CONNECTION *conn;
repmgr_netaddr_t addr;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *response_buf, siteinfo_buf[MAX_MSG_BUF];
size_t len;
u_int32_t type;
@@ -3046,23 +3415,33 @@ __repmgr_remove_site(dbsite)
DB_ASSERT(env, IS_VALID_EID(master));
addr = SITE_FROM_EID(master)->net_addr;
UNLOCK_MUTEX(db_rep->mutex);
-
len = strlen(dbsite->host) + 1;
- DB_INIT_DBT(site_info.host, dbsite->host, len);
- site_info.port = dbsite->port;
- site_info.flags = 0;
- ret = __repmgr_site_info_marshal(env,
- &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
- DB_ASSERT(env, ret == 0);
conn = NULL;
response_buf = NULL;
- if ((ret = make_request_conn(env, &addr, &conn)) != 0)
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
return (ret);
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ if (conn->version < 5) {
+ DB_INIT_DBT(v4site_info.host, dbsite->host, len);
+ v4site_info.port = dbsite->port;
+ v4site_info.flags = 0;
+ ret = __repmgr_v4site_info_marshal(env,
+ &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
+ } else {
+ DB_INIT_DBT(site_info.host, dbsite->host, len);
+ site_info.port = dbsite->port;
+ site_info.status = 0;
+ site_info.flags = 0;
+ ret = __repmgr_site_info_marshal(env,
+ &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
+ }
+ DB_ASSERT(env, ret == 0);
+
if ((ret = __repmgr_send_sync_msg(env, conn,
REPMGR_REMOVE_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0)
goto err;
- if ((ret = read_own_msg(env,
+ if ((ret = __repmgr_read_own_msg(env,
conn, &type, &response_buf, &len)) != 0)
goto err;
ret = type == REPMGR_REMOVE_SUCCESS ? 0 : DB_REP_UNAVAIL;
@@ -3090,3 +3469,82 @@ __repmgr_site_close(dbsite)
__os_free(dbsite->env, dbsite);
return (0);
}
+
+/*
+ * Demotes a participant site to a view. This is a one-way and one-time
+ * operation.
+ *
+ * The demotion occurs at the very end of repmgr_start() because it
+ * requires a select thread to perform the gmdb operations that remove
+ * the site from the replication group and immediately add the site back
+ * into the group as a view. The demotion also preserves any other threads
+ * created by repmgr_start() so that they are there to be used by the
+ * demoted site after it is re-added as a view site.
+ *
+ * We remove and re-add the site to propagate the site's change from
+ * participant to view to all sites in the replication group. This includes
+ * updates to each site's gmdb and in-memory site list.
+ */
+#define REPMGR_DEMOTION_MASTER_RETRIES 10
+#define REPMGR_DEMOTION_RETRY_USECS 500000
+static int
+__repmgr_demote_site(env, eid)
+ ENV *env;
+ int eid;
+{
+ DB_REP *db_rep;
+ DB_SITE *dbsite;
+ REP *rep;
+ REPMGR_SITE *site;
+ int ret, t_ret, tries;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ site = SITE_FROM_EID(eid);
+ dbsite = NULL;
+
+ /* Inform other repmgr threads that a demotion is in progress. */
+ db_rep->demotion_pending = TRUE;
+
+ if ((ret = init_dbsite(env, eid, site->net_addr.host,
+ site->net_addr.port, &dbsite)) != 0)
+ goto err;
+
+ /*
+ * We need a master to perform the gmdb updates. Poll periodically
+ * for a limited time to find one.
+ */
+ tries = 0;
+ while (rep->master_id == DB_EID_INVALID) {
+ __os_yield(env, 0, REPMGR_DEMOTION_RETRY_USECS);
+ if (++tries >= REPMGR_DEMOTION_MASTER_RETRIES) {
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ }
+ }
+
+ /* Remove site from replication group. */
+ if ((ret = __repmgr_remove_site(dbsite)) != 0)
+ goto err;
+
+ /*
+ * Add site back into replication group as a view. This demotion is
+ * occurring because this site now has a view callback but its
+ * SITE_VIEW flag is not set. Now, __repmgr_join_group() will detect
+ * the view callback and set the SITE_VIEW flag before sending this
+ * site's information to the rest of the replication group.
+ */
+ if ((ret = __repmgr_join_group(env)) != 0)
+ goto err;
+
+err:
+ /* Deallocates dbsite. */
+ if (dbsite != NULL) {
+ t_ret = __repmgr_site_close(dbsite);
+ if (ret == 0 && t_ret != 0)
+ ret = t_ret;
+ }
+ /* Must reset demotion_pending before leaving this routine. */
+ db_rep->demotion_pending = FALSE;
+ return (ret);
+}