summaryrefslogtreecommitdiff
path: root/src/repmgr/repmgr_sel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/repmgr/repmgr_sel.c')
-rw-r--r--src/repmgr/repmgr_sel.c726
1 files changed, 664 insertions, 62 deletions
diff --git a/src/repmgr/repmgr_sel.c b/src/repmgr/repmgr_sel.c
index ba14368f..c32dad25 100644
--- a/src/repmgr/repmgr_sel.c
+++ b/src/repmgr/repmgr_sel.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -12,7 +12,7 @@
typedef int (*HEARTBEAT_ACTION) __P((ENV *));
-static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
+static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *, int *));
static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
static void check_min_log_file __P((ENV *));
static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
@@ -23,13 +23,18 @@ static int process_parameters __P((ENV *,
static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_call_election __P((ENV *));
+static int __repmgr_check_listener __P((ENV *));
+static int __repmgr_check_master_listener __P((ENV *));
static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
static void *__repmgr_connector_thread __P((void *));
static int __repmgr_next_timeout __P((ENV *,
db_timespec *, HEARTBEAT_ACTION *));
+static int __repmgr_reset_last_rcvd __P((ENV *));
static int __repmgr_retry_connections __P((ENV *));
static int __repmgr_send_heartbeat __P((ENV *));
-static int __repmgr_try_one __P((ENV *, int));
+static int __repmgr_start_takeover __P((ENV *));
+static void *__repmgr_takeover_thread __P((void *));
+static int __repmgr_try_one __P((ENV *, int, int));
static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
@@ -49,17 +54,24 @@ void *
__repmgr_select_thread(argsp)
void *argsp;
{
- REPMGR_RUNNABLE *args;
ENV *env;
+ DB_THREAD_INFO *ip;
int ret;
+ REPMGR_RUNNABLE *args;
args = argsp;
env = args->env;
+ ip = NULL;
+ ret = 0;
- if ((ret = __repmgr_select_loop(env)) != 0) {
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret != 0 || (ret = __repmgr_select_loop(env)) != 0) {
__db_err(env, ret, DB_STR("3614", "select loop failed"));
+ ENV_LEAVE(env, ip);
(void)__repmgr_thread_failure(env, ret);
}
+ if (ret == 0)
+ ENV_LEAVE(env, ip);
return (NULL);
}
@@ -71,12 +83,19 @@ __repmgr_bow_out(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
int ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
ret = __repmgr_stop_threads(env);
UNLOCK_MUTEX(db_rep->mutex);
+ /*
+ * Reset sites_avail so that it will be calculated correctly if this
+ * site rejoins the group in the future.
+ */
+ rep->sites_avail = 0;
DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
return (ret);
}
@@ -187,23 +206,53 @@ __repmgr_compute_timeout(env, timeout)
db_rep = env->rep_handle;
/*
- * There are two factors to consider: are heartbeats in use? and, do we
+ * There are four factors to consider: are heartbeats in use? do we
* have any sites with broken connections that we ought to retry?
+ * is there a listener process running locally? do we need to call
+ * an election if no master listener exists?
*/
have_timeout = __repmgr_next_timeout(env, &t, NULL);
/* List items are in order, so we only have to examine the first one. */
if (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
- if (have_timeout) {
+ if (have_timeout)
/* Choose earliest timeout deadline. */
t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
- } else {
+ else {
t = retry->time;
have_timeout = TRUE;
}
}
+ /* Check listener every timeout in subordinate rep-aware process. */
+ if (IS_LISTENER_CAND(db_rep)) {
+ if (!timespecisset(&db_rep->l_listener_chk)) {
+ __os_gettime(env, &now, 1);
+ TIMESPEC_ADD_DB_TIMEOUT(&now, db_rep->l_listener_wait);
+ db_rep->l_listener_chk = now;
+ }
+ if (have_timeout)
+ t = timespeccmp(&db_rep->l_listener_chk, &t, <) ?
+ db_rep->l_listener_chk : t;
+ else {
+ t = db_rep->l_listener_chk;
+ have_timeout = TRUE;
+ }
+ }
+
+ /* Check master listener if needed. */
+ if (FLD_ISSET(db_rep->region->config, REP_C_AUTOTAKEOVER) &&
+ timespecisset(&db_rep->m_listener_chk)) {
+ if (have_timeout)
+ t = timespeccmp(&db_rep->m_listener_chk, &t, <) ?
+ db_rep->m_listener_chk : t;
+ else {
+ t = db_rep->m_listener_chk;
+ have_timeout = TRUE;
+ }
+ }
+
if (have_timeout) {
__os_gettime(env, &now, 1);
if (timespeccmp(&now, &t, >=))
@@ -242,7 +291,17 @@ __repmgr_next_timeout(env, deadline, action)
if (rep->master_id == db_rep->self_eid &&
rep->heartbeat_frequency > 0) {
- t = db_rep->last_bcast;
+ /*
+ * A temporary master in preferred master mode must send
+ * regular heartbeats regardless of other activity because
+ * the preferred master requires a heartbeat to take over as
+ * master after it has synced with the temporary master.
+ */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT))
+ t = db_rep->last_hbeat;
+ else
+ t = db_rep->last_bcast;
TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
my_action = __repmgr_send_heartbeat;
} else if ((master = __repmgr_connected_master(env)) != NULL &&
@@ -301,6 +360,24 @@ __repmgr_send_heartbeat(env)
db_rep = env->rep_handle;
rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * Check test hook preventing heartbeats and connection attempts.
+ * This is used to create and maintain a dupmaster condition in
+ * a test until the test hook is rescinded.
+ */
+ DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT);
+
+ /*
+ * Track last heartbeat for temporary master in preferred master
+ * mode so that it will send regular heartbeats regardless of
+ * other activity.
+ */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) &&
+ rep->master_id == db_rep->self_eid)
+ __os_gettime(env, &db_rep->last_hbeat, 1);
permlsn.generation = rep->gen;
if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
@@ -310,8 +387,11 @@ __repmgr_send_heartbeat(env)
control.size = __REPMGR_PERMLSN_SIZE;
DB_INIT_DBT(rec, NULL, 0);
- return (__repmgr_send_broadcast(env,
- REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
+ ret =__repmgr_send_broadcast(env,
+ REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3);
+
+DB_TEST_RECOVERY_LABEL
+ return (ret);
}
/*
@@ -373,6 +453,8 @@ __repmgr_check_timeouts(env)
HEARTBEAT_ACTION action;
int ret;
+ ret = 0;
+
/*
* Figure out the next heartbeat-related thing to be done. Then, if
* it's time to do it, do so.
@@ -384,7 +466,342 @@ __repmgr_check_timeouts(env)
return (ret);
}
- return (__repmgr_retry_connections(env));
+ /* Check the existence of local listener. */
+ if ((ret = __repmgr_check_listener(env)) != 0)
+ return (ret);
+
+ /* Check the existence of master listener. */
+ if ((ret = __repmgr_check_master_listener(env)) != 0)
+ return (ret);
+
+ /*
+ * Check test hook preventing heartbeats and connection attempts.
+ * This is used to create and maintain a dupmaster condition in
+ * a test until the test hook is rescinded.
+ */
+ DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT);
+
+ ret = __repmgr_retry_connections(env);
+
+DB_TEST_RECOVERY_LABEL
+ return (ret);
+}
+
+/*
+ * Check the existence of the listener process on the local site. If one
+ * does not exist and the current process is a subordinate rep-aware process,
+ * then start a takeover thread to covert this process to the listener process.
+ */
+static int
+__repmgr_check_listener(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ SITEINFO *sites;
+ db_timespec t;
+ int ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * Only subordinate rep-aware process can take over listener role, so
+ * no need to check listener in listener process or rep unaware process.
+ */
+ if (!IS_LISTENER_CAND(db_rep))
+ return (0);
+
+ /*
+ * If the listener quits due to site removal, no subordinate process
+ * should take over as listener as the current site is not expected
+ * to be active in the group. Check the status from the site array
+ * in the shared region instead of that in the GMDB. We do this
+ * because the GMDB doesn't apply the change yet when replication
+ * is stopped on the removed site.
+ */
+ sites = R_ADDR(env->reginfo, rep->siteinfo_off);
+ if (sites[rep->self_eid].status == SITE_DELETING)
+ return (0);
+
+ /*
+ * Check the listener after timeout. If there is no listener, we
+ * take over. During takeover, we will refresh all connections.
+ * A subordinate process does not have an up-to-date site list, so sync
+ * up addresses from the in-memory site array before takeover.
+ */
+ __os_gettime(env, &t, 1);
+ if (timespeccmp(&t, &db_rep->l_listener_chk, >=)) {
+ /* Compute the next timeout. */
+ TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->l_listener_wait);
+ db_rep->l_listener_chk = t;
+
+ /* Check if site address information needs to be refreshed. */
+ if ((rep->siteinfo_seq > db_rep->siteinfo_seq) &&
+ (ret = __repmgr_sync_siteaddr(env)) != 0)
+ return (ret);
+
+ if (rep->listener == 0)
+ ret = __repmgr_start_takeover(env);
+ }
+ return (ret);
+}
+
+/*
+ * Start a thread to take over the listener role in the current subordinate
+ * process.
+ */
+static int
+__repmgr_start_takeover(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_RUNNABLE *th;
+ int ret;
+
+ db_rep = env->rep_handle;
+ th = db_rep->takeover_thread;
+ if (th == NULL) {
+ if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE),
+ &th)) != 0)
+ return (ret);
+ db_rep->takeover_thread = th;
+ } else if (th->finished) {
+ if ((ret = __repmgr_thread_join(th)) != 0)
+ return (ret);
+ } else {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "takeover thread still running"));
+ return (0);
+ }
+ th->run = __repmgr_takeover_thread;
+ if ((ret = __repmgr_thread_start(env, th)) != 0) {
+ __os_free(env, th);
+ db_rep->takeover_thread = NULL;
+ }
+ return (ret);
+}
+
+/*
+ * Take over listener role in the current subordinate process.
+ */
+static void *
+__repmgr_takeover_thread(argsp)
+ void *argsp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+ REPMGR_RUNNABLE *th;
+ int nthreads, ret, save_policy;
+
+ th = argsp;
+ env = th->env;
+ db_rep = env->rep_handle;
+ ip = NULL;
+ rep = db_rep->region;
+ ret = 0;
+
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret != 0)
+ goto out;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting takeover thread"));
+ /*
+ * It is likely that there is an old heartbeat ready to expire
+ * immediately upon restarting repmgr, leading to an unnecessary
+ * election. Reset the expiration countdown here to avoid this.
+ */
+ if ((ret = __repmgr_reset_last_rcvd(env)) != 0)
+ goto out;
+ /*
+ * If nthreads is set to be 0 in the current subordinate process, use
+ * the value in the last listener. The nthreads should be larger than
+ * 0 in listener.
+ */
+ nthreads = db_rep->config_nthreads == 0 ? (int)rep->listener_nthreads :
+ db_rep->config_nthreads;
+ /*
+ * It is possible that this subordinate process does not have intact
+ * connections to the other sites. For most ack policies, restarting
+ * repmgr will wait for acks when it commits its transaction to reload
+ * the gmdb. Temporarily set the ack policy to NONE for the takeover
+ * so that it is not delayed waiting for acks that can never come.
+ */
+ save_policy = rep->perm_policy;
+ rep->perm_policy = DB_REPMGR_ACKS_NONE;
+ /*
+ * Restart the repmgr as listener. If DB_REP_IGNORE is returned,
+ * the current process has become listener. If DB_REP_UNAVAIL is
+ * returned, the site has been removed from the group and no listener
+ * should be started. For any other error, if the replication is
+ * stopped because of the takeover thread, we will notify the
+ * application.
+ */
+ ret = __repmgr_start_int(env, nthreads, F_ISSET(rep, REP_F_MASTER) ?
+ DB_REP_MASTER : DB_REP_CLIENT);
+ if (ret == 0 && !IS_SUBORDINATE(db_rep) &&
+ db_rep->repmgr_status == running) {
+ STAT(rep->mstat.st_takeovers++);
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "finished takeover and became listener"));
+ } else if (ret != 0 && db_rep->repmgr_status == stopped) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "failed to take over, repmgr was stopped"));
+ DB_EVENT(env, DB_EVENT_REP_AUTOTAKEOVER_FAILED, NULL);
+ } else {
+ /* The current process is not changed to listener. */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "failed to take over"));
+ }
+ rep->perm_policy = save_policy;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "takeover thread is exiting"));
+ ENV_LEAVE(env, ip);
+out: th->finished = TRUE;
+ return (NULL);
+}
+
+/*
+ * Reset the last_rcvd_timestamp to restart the wait for a heartbeat
+ * monitor expiration.
+ */
+static int
+__repmgr_reset_last_rcvd(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *master;
+
+ db_rep = env->rep_handle;
+
+ LOCK_MUTEX(db_rep->mutex);
+ if ((master = __repmgr_connected_master(env)) != NULL)
+ __os_gettime(env, &master->last_rcvd_timestamp, 1);
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (0);
+}
+
+/*
+ * Monitor the connection to master listener. When the master listener is
+ * disconnected and some other master process might take over as listener
+ * soon, we will delay the election. After the delay if there is still no
+ * connection from master listener, call an election then.
+ */
+static int
+__repmgr_check_master_listener(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ REPMGR_SITE *master;
+ db_timespec t;
+ u_int32_t flags;
+ int ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * We only check for a master listener if m_listener_chk is set.
+ * The field is only set when __repmgr_bust_connection() previously
+ * detected the loss of our connection to the master listener.
+ * If rep->master_id is invalid, wait until it is ready to check.
+ */
+ if (!FLD_ISSET((db_rep)->region->config, REP_C_AUTOTAKEOVER) ||
+ !timespecisset(&db_rep->m_listener_chk) ||
+ !IS_VALID_EID(rep->master_id))
+ return (0);
+
+ __os_gettime(env, &t, 1);
+ if (timespeccmp(&t, &db_rep->m_listener_chk, >=)) {
+ master = SITE_FROM_EID(db_rep->region->master_id);
+ if (master->ref.conn.out == NULL &&
+ master->ref.conn.in == NULL) {
+ flags = ELECT_F_EVENT_NOTIFY;
+ if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS))
+ LF_SET(ELECT_F_IMMED | ELECT_F_FAST);
+ else
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Master failure, but no elections"));
+
+ /*
+ * In preferred master mode, a client that has lost its
+ * connection to the master uses an election thread to
+ * restart as master.
+ */
+ if (IS_PREFMAS_MODE(env)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+"check_master_listener setting preferred master temp master"));
+ db_rep->prefmas_pending = start_temp_master;
+ }
+
+ ret = __repmgr_init_election(env, flags);
+ }
+ /*
+ * If the delay has expired reset m_listener_chk. We reset
+ * it whether or not the master listener process comes back
+ * so that we will not continue checking for a master listener
+ * indefinitely.
+ */
+ timespecclear(&db_rep->m_listener_chk);
+ }
+ return (ret);
+}
+
+/*
+ * Wake up I/O waiting in selector thread, refresh connections to all connected
+ * and present sites.
+ *
+ * PUBLIC: int __repmgr_refresh_selector __P((ENV *));
+ */
+int
+__repmgr_refresh_selector(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ REPMGR_RETRY *retry;
+ REPMGR_SITE *site;
+ SITEINFO *sites;
+ int eid, ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ if ((ret = __repmgr_wake_main_thread(env)) != 0)
+ return (ret);
+
+ FOR_EACH_REMOTE_SITE_INDEX(eid) {
+ SET_LISTENER_CAND(1, = 0);
+ site = SITE_FROM_EID(eid);
+
+ /*
+ * It is possible some sites were left in a paused state
+ * during the switch, so they have to be removed from the
+ * retry list.
+ */
+ if (site->state == SITE_PAUSING) {
+ retry = site->ref.retry;
+ if (retry != NULL) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Removing site from retry list eid %lu",
+ (u_long)eid));
+ TAILQ_REMOVE(&db_rep->retries, retry, entries);
+ __os_free(env, retry);
+ site->ref.retry = NULL;
+ }
+
+ }
+ /*
+ * Try to connect to any site that is now PRESENT after
+ * rereading the gmdb.
+ */
+ if (site->membership == SITE_PRESENT &&
+ (ret = __repmgr_try_one(env, eid, TRUE)) != 0)
+ return (ret);
+ }
+ return (0);
}
/*
@@ -415,10 +832,11 @@ __repmgr_retry_connections(env)
__os_free(env, retry);
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
+ site->ref.retry = NULL;
DB_ASSERT(env, site->state == SITE_PAUSING);
if (site->membership == SITE_PRESENT) {
- if ((ret = __repmgr_try_one(env, eid)) != 0)
+ if ((ret = __repmgr_try_one(env, eid, FALSE)) != 0)
return (ret);
} else
site->state = SITE_IDLE;
@@ -437,11 +855,23 @@ __repmgr_first_try_connections(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_SITE *site;
+ SITEINFO *sites;
int eid, ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ /*
+ * Check test hook preventing heartbeats and connection attempts.
+ * This is used to create and maintain a dupmaster condition in
+ * a test until the test hook is rescinded.
+ */
+ DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT);
+
FOR_EACH_REMOTE_SITE_INDEX(eid) {
+ SET_LISTENER_CAND(1, = 0);
site = SITE_FROM_EID(eid);
/*
* Normally all sites would be IDLE here. But if a user thread
@@ -453,19 +883,22 @@ __repmgr_first_try_connections(env)
*/
if (site->state == SITE_IDLE &&
site->membership == SITE_PRESENT &&
- (ret = __repmgr_try_one(env, eid)) != 0)
+ (ret = __repmgr_try_one(env, eid, FALSE)) != 0)
return (ret);
}
+DB_TEST_RECOVERY_LABEL
return (0);
}
/*
- * Starts a thread to open a connection to the site at the given EID.
+ * Starts a thread to open a connection to the site at the given EID. We might
+ * have no connection to the site, or an existing connection to be replaced.
*/
static int
-__repmgr_try_one(env, eid)
+__repmgr_try_one(env, eid, refresh)
ENV *env;
int eid;
+ int refresh;
{
DB_REP *db_rep;
REPMGR_SITE *site;
@@ -488,13 +921,22 @@ __repmgr_try_one(env, eid)
"eid %lu previous connector thread still running; will retry",
(u_long)eid));
return (__repmgr_schedule_connection_attempt(env,
- eid, FALSE));
+ eid, refresh));
}
site->state = SITE_CONNECTING;
th->run = __repmgr_connector_thread;
- th->args.eid = eid;
+ th->args.conn_th.eid = eid;
+ /*
+ * The flag CONNECT_F_REFRESH indicates an immediate connection attempt
+ * should be scheduled if the current connection attempt fails. It is
+ * turned on before the first attempt to refresh the connection but
+ * turned off if the first attempt fails. In this way, when refreshing
+ * the connection, there will be at most two immediate connection
+ * attempts, after that, retry as usual.
+ */
+ th->args.conn_th.flags = refresh ? CONNECT_F_REFRESH : 0;
if ((ret = __repmgr_thread_start(env, th)) != 0) {
__os_free(env, th);
site->connector = NULL;
@@ -506,21 +948,33 @@ static void *
__repmgr_connector_thread(argsp)
void *argsp;
{
- REPMGR_RUNNABLE *th;
ENV *env;
+ DB_THREAD_INFO *ip;
+ REPMGR_RUNNABLE *th;
int ret;
th = argsp;
env = th->env;
+ ip = NULL;
+ ret = 0;
- RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "starting connector thread, eid %u", th->args.eid));
- if ((ret = __repmgr_connector_main(env, th)) != 0) {
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret == 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "starting connector thread, eid %u",
+ th->args.conn_th.eid));
+ if (ret != 0 || (ret = __repmgr_connector_main(env, th)) != 0) {
__db_err(env, ret, DB_STR("3617", "connector thread failed"));
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "connector thread is exiting"));
+ ENV_LEAVE(env, ip);
(void)__repmgr_thread_failure(env, ret);
}
- RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
-
+ if (ret == 0) {
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "connector thread is exiting"));
+ ENV_LEAVE(env, ip);
+ }
th->finished = TRUE;
return (NULL);
}
@@ -542,8 +996,8 @@ __repmgr_connector_main(env, th)
ret = 0;
LOCK_MUTEX(db_rep->mutex);
- DB_ASSERT(env, IS_VALID_EID(th->args.eid));
- site = SITE_FROM_EID(th->args.eid);
+ DB_ASSERT(env, IS_VALID_EID(th->args.conn_th.eid));
+ site = SITE_FROM_EID(th->args.conn_th.eid);
if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
goto unlock;
@@ -563,7 +1017,8 @@ __repmgr_connector_main(env, th)
UNLOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
- DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
+ DB_EVENT(env,
+ DB_EVENT_REP_CONNECT_ESTD, &th->args.conn_th.eid);
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
__db_err(env, ret, DB_STR("3618",
@@ -571,33 +1026,53 @@ __repmgr_connector_main(env, th)
goto cleanup;
}
conn->type = REP_CONNECTION;
- site = SITE_FROM_EID(th->args.eid);
+ site = SITE_FROM_EID(th->args.conn_th.eid);
if (site->state != SITE_CONNECTING ||
db_rep->repmgr_status == stopped)
goto cleanup;
- conn->eid = th->args.eid;
- site = SITE_FROM_EID(th->args.eid);
- site->ref.conn.out = conn;
+ conn->eid = th->args.conn_th.eid;
+ site = SITE_FROM_EID(th->args.conn_th.eid);
+ /*
+ * If there is an existing outgoing connection, disable it and
+ * replace it with a new connection. The sites for a formerly
+ * subordinate handle that is now taking over might still be
+ * SITE_CONNECTING. Set to SITE_CONNECTED before disabling
+ * connection so that sites_avail is correctly maintained.
+ */
site->state = SITE_CONNECTED;
+ if (site->ref.conn.out != NULL)
+ (void)__repmgr_disable_connection(env,
+ site->ref.conn.out);
+ site->ref.conn.out = conn;
__os_gettime(env, &site->last_rcvd_timestamp, 1);
ret = __repmgr_wake_main_thread(env);
} else if (ret == DB_REP_UNAVAIL) {
/* Retryable error while trying to connect: retry later. */
- info.eid = th->args.eid;
+ info.eid = th->args.conn_th.eid;
info.error = err;
DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
STAT(db_rep->region->mstat.st_connect_fail++);
LOCK_MUTEX(db_rep->mutex);
- site = SITE_FROM_EID(th->args.eid);
+ site = SITE_FROM_EID(th->args.conn_th.eid);
if (site->state != SITE_CONNECTING ||
db_rep->repmgr_status == stopped) {
ret = 0;
goto unlock;
}
+ /*
+ * If it fails to create a new outgoing connection to replace
+ * the existing one in the first attempt, schedule another
+ * immediate attempt. If it is our second attempt, disable
+ * the existing connections and retry as normal.
+ */
+ if (site->ref.conn.out != NULL && th->args.conn_th.flags == 0)
+ (void)__repmgr_disable_connection(env,
+ site->ref.conn.out);
ret = __repmgr_schedule_connection_attempt(env,
- th->args.eid, FALSE);
+ th->args.conn_th.eid,
+ th->args.conn_th.flags == CONNECT_F_REFRESH);
} else
goto out;
@@ -842,6 +1317,7 @@ prepare_input(env, conn)
if ((ret = __os_malloc(env, memsize, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
+ conn->input.rep_message->size = memsize;
conn->input.rep_message->msg_hdr = msg_hdr;
conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
@@ -876,6 +1352,7 @@ prepare_input(env, conn)
if ((ret = __os_malloc(env, memsize, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
+ conn->input.rep_message->size = memsize;
conn->input.rep_message->msg_hdr = msg_hdr;
conn->input.rep_message->v.appmsg.conn = conn;
@@ -891,6 +1368,7 @@ prepare_input(env, conn)
if ((ret = __os_malloc(env, size, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
+ conn->input.rep_message->size = size;
conn->input.rep_message->msg_hdr = msg_hdr;
/*
@@ -1065,16 +1543,18 @@ dispatch_msgin(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
+ DBT *dbt;
DB_REP *db_rep;
- REPMGR_SITE *site;
- REPMGR_RUNNABLE *th;
+ REP *rep;
REPMGR_RESPONSE *resp;
- DBT *dbt;
+ REPMGR_RUNNABLE *th;
+ REPMGR_SITE *site;
char *hostname;
- int eid, ret;
+ int eid, ret, subord;
DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
db_rep = env->rep_handle;
+ rep = db_rep->region;
switch (conn->state) {
case CONN_CONNECTED:
@@ -1129,9 +1609,22 @@ dispatch_msgin(env, conn)
dbt = &conn->input.repmgr_msg.rec;
hostname = dbt->data;
hostname[dbt->size-1] = '\0';
- if ((ret = accept_handshake(env, conn, hostname)) != 0)
+ if ((ret = accept_handshake(env,
+ conn, hostname, &subord)) != 0)
return (ret);
conn->state = CONN_READY;
+ site = SITE_FROM_EID(conn->eid);
+ /*
+ * Do not increase sites_avail redundantly for an
+ * incoming subordinate connection.
+ */
+ if (conn->type == REP_CONNECTION &&
+ site->state == SITE_CONNECTED && !subord) {
+ rep->sites_avail++;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "msgin: EID %lu CONNECTED, READY. sites_avail %lu",
+ (u_long)conn->eid, (u_long)rep->sites_avail));
+ }
break;
case REPMGR_OWN_MSG:
/*
@@ -1279,9 +1772,11 @@ process_own_msg(env, conn)
REPMGR_SITE *site;
REPMGR_MESSAGE *msg;
__repmgr_connect_reject_args reject;
+ __repmgr_v4connect_reject_args v4reject;
__repmgr_parm_refresh_args parms;
int ret;
+ db_rep = env->rep_handle;
ret = 0;
/*
* Set "msg" to point to the message struct. If we do all necessary
@@ -1293,28 +1788,61 @@ process_own_msg(env, conn)
switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
case REPMGR_CONNECT_REJECT:
dbt = &msg->v.gmdb_msg.request;
- if ((ret = __repmgr_connect_reject_unmarshal(env,
- &reject, dbt->data, dbt->size, NULL)) != 0)
- return (DB_REP_UNAVAIL);
+ if (conn->version < 5) {
+ if ((ret = __repmgr_v4connect_reject_unmarshal(env,
+ &v4reject, dbt->data, dbt->size, NULL)) != 0)
+ return (DB_REP_UNAVAIL);
+ reject.version = v4reject.version;
+ reject.gen = v4reject.gen;
+ reject.status = 0;
+ } else {
+ if ((ret = __repmgr_connect_reject_unmarshal(env,
+ &reject, dbt->data, dbt->size, NULL)) != 0)
+ return (DB_REP_UNAVAIL);
+ }
/*
* If we're being rejected by someone who has more up-to-date
- * membership information than we do, it means we have been
- * removed from the group. If we've just gotten started, we can
- * make one attempt at automatically rejoining; otherwise we bow
- * out gracefully.
+ * membership information than we do, it means we are not in
+ * the group. If we've just gotten started, or our status is
+ * adding, we can make one attempt at automatically rejoining;
+ * otherwise we bow out gracefully.
*/
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "got rejection msg citing version %lu/%lu",
- (u_long)reject.gen, (u_long)reject.version));
+ "got rejection msg citing version %lu/%lu mine %lu/%lu membership %lu",
+ (u_long)reject.gen, (u_long)reject.version,
+ (u_long)db_rep->member_version_gen,
+ (u_long)db_rep->membership_version,
+ (u_long)reject.status));
if (__repmgr_gmdb_version_cmp(env,
reject.gen, reject.version) > 0) {
- if (env->rep_handle->seen_repmsg)
+ if (db_rep->seen_repmsg && reject.status != SITE_ADDING)
ret = DB_DELETED;
- else if ((ret = __repmgr_defer_op(env,
- REPMGR_REJOIN)) == 0)
- ret = DB_REP_UNAVAIL;
+ else {
+ /*
+ * If 2SITE_STRICT is off, we are likely to
+ * win an election with our own vote before
+ * discovering there is already a master.
+ * Set indicator to defer the election until
+ * after rejoining group.
+ *
+ * In preferred master mode, either site
+ * should defer the election (which
+ * executes the preferred master startup
+ * code and only calls an election if it is
+ * safe) and also avoid scheduling an extra
+ * reconnect attempt in bust_connection()
+ * by setting the indicator.
+ */
+ if (!FLD_ISSET(db_rep->region->config,
+ REP_C_2SITE_STRICT) ||
+ IS_PREFMAS_MODE(env))
+ db_rep->rejoin_pending = TRUE;
+ if ((ret = __repmgr_defer_op(env,
+ REPMGR_REJOIN)) == 0)
+ ret = DB_REP_UNAVAIL;
+ }
} else
ret = DB_REP_UNAVAIL;
DB_ASSERT(env, ret != 0);
@@ -1332,7 +1860,6 @@ process_own_msg(env, conn)
if ((ret = __repmgr_parm_refresh_unmarshal(env,
&parms, dbt->data, dbt->size, NULL)) != 0)
return (DB_REP_UNAVAIL);
- db_rep = env->rep_handle;
DB_ASSERT(env, conn->type == REP_CONNECTION &&
IS_KNOWN_REMOTE_SITE(conn->eid));
site = SITE_FROM_EID(conn->eid);
@@ -1348,8 +1875,15 @@ process_own_msg(env, conn)
case REPMGR_GM_FORWARD:
case REPMGR_JOIN_REQUEST:
case REPMGR_JOIN_SUCCESS:
+ case REPMGR_LSNHIST_REQUEST:
+ case REPMGR_LSNHIST_RESPONSE:
+ case REPMGR_PREFMAS_FAILURE:
+ case REPMGR_PREFMAS_SUCCESS:
+ case REPMGR_READONLY_MASTER:
+ case REPMGR_READONLY_RESPONSE:
case REPMGR_REMOVE_REQUEST:
case REPMGR_RESOLVE_LIMBO:
+ case REPMGR_RESTART_CLIENT:
default:
__db_errx(env, DB_STR_A("3677",
"unexpected msg type %lu in process_own_msg", "%lu"),
@@ -1482,6 +2016,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags)
cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
break;
case 4:
+ case 5:
+ case 6:
cntrl_len = __REPMGR_HANDSHAKE_SIZE;
break;
default:
@@ -1513,6 +2049,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags)
__repmgr_v3handshake_marshal(env, &v3hs, p);
break;
case 4:
+ case 5:
+ case 6:
hs.port = my_addr->port;
hs.alignment = MEM_ALIGN;
hs.ack_policy = (u_int32_t)rep->perm_policy;
@@ -1551,11 +2089,14 @@ read_version_response(env, conn)
DB_REP *db_rep;
__repmgr_version_confirmation_args conf;
DBT vi;
+ REP *rep;
+ REPMGR_SITE *site;
char *hostname;
u_int32_t flags;
- int ret;
+ int ret, subord;
db_rep = env->rep_handle;
+ rep = db_rep->region;
if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
return (ret);
@@ -1581,14 +2122,37 @@ read_version_response(env, conn)
return (DB_REP_UNAVAIL);
}
- if ((ret = accept_handshake(env, conn, hostname)) != 0)
+ if ((ret = accept_handshake(env, conn, hostname, &subord)) != 0)
return (ret);
- flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
+ if (!IS_SUBORDINATE(db_rep))
+ flags = 0;
+ else {
+ flags = REPMGR_SUBORDINATE;
+ if (FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER) &&
+ db_rep->repmgr_status == running)
+ /*
+ * Takeover is enabled in rep-aware subordinate
+ * process.
+ */
+ flags |= REPMGR_AUTOTAKEOVER;
+ }
if ((ret = __repmgr_send_handshake(env,
conn, NULL, 0, flags)) != 0)
return (ret);
}
conn->state = CONN_READY;
+ site = SITE_FROM_EID(conn->eid);
+ /*
+ * Do not increase sites_avail redundantly for a new outgoing
+ * connection from a subordinate process.
+ */
+ if (conn->type == REP_CONNECTION &&
+ site->state == SITE_CONNECTED && !IS_SUBORDINATE(db_rep)) {
+ rep->sites_avail++;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "vers_resp: EID %lu CONNECTED, READY. sites_avail %lu",
+ (u_long)conn->eid, (u_long)rep->sites_avail));
+ }
return (ret);
}
@@ -1641,10 +2205,11 @@ __repmgr_find_version_info(env, conn, vi)
}
static int
-accept_handshake(env, conn, hostname)
+accept_handshake(env, conn, hostname, subordinate)
ENV *env;
REPMGR_CONNECTION *conn;
char *hostname;
+ int *subordinate;
{
__repmgr_handshake_args hs;
__repmgr_v2handshake_args hs2;
@@ -1653,6 +2218,7 @@ accept_handshake(env, conn, hostname)
u_int32_t ack, flags;
int electable;
+ *subordinate = 0;
switch (conn->version) {
case 2:
if (__repmgr_v2handshake_unmarshal(env, &hs2,
@@ -1674,6 +2240,8 @@ accept_handshake(env, conn, hostname)
ack = 0;
break;
case 4:
+ case 5:
+ case 6:
if (__repmgr_handshake_unmarshal(env, &hs,
conn->input.repmgr_msg.cntrl.data,
conn->input.repmgr_msg.cntrl.size, NULL) != 0)
@@ -1682,6 +2250,8 @@ accept_handshake(env, conn, hostname)
electable = F_ISSET(&hs, ELECTABLE_SITE);
flags = hs.flags;
ack = hs.ack_policy;
+ if (LF_ISSET(REPMGR_SUBORDINATE))
+ *subordinate = 1;
break;
default:
__db_errx(env, DB_STR_A("3679",
@@ -1729,13 +2299,17 @@ process_parameters(env, conn, host, port, ack, electable, flags)
u_int32_t ack, flags;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_RETRY *retry;
REPMGR_SITE *site;
+ SITEINFO *sites;
__repmgr_connect_reject_args reject;
+ __repmgr_v4connect_reject_args v4reject;
u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
int eid, ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
/* Connection state can be used to discern incoming versus outgoing. */
if (conn->state == CONN_CONNECTED) {
@@ -1785,6 +2359,13 @@ process_parameters(env, conn, host, port, ack, electable, flags)
TAILQ_INSERT_TAIL(&site->sub_conns,
conn, entries);
conn->eid = eid;
+ conn->auto_takeover =
+ LF_ISSET(REPMGR_AUTOTAKEOVER) ? 1 : 0;
+ SET_LISTENER_CAND(conn->auto_takeover, ++);
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "handshake from subordinate %sconnection at site %s:%u EID %u",
+ LF_ISSET(REPMGR_AUTOTAKEOVER)?
+ "takeover ": "", host, port, eid));
} else {
DB_EVENT(env,
DB_EVENT_REP_CONNECT_ESTD, &eid);
@@ -1797,6 +2378,7 @@ process_parameters(env, conn, host, port, ack, electable, flags)
TAILQ_REMOVE(&db_rep->retries,
retry, entries);
__os_free(env, retry);
+ site->ref.retry = NULL;
break;
case SITE_CONNECTED:
/*
@@ -1821,6 +2403,16 @@ process_parameters(env, conn, host, port, ack, electable, flags)
* don't have to do anything else here.
*/
break;
+ case SITE_IDLE:
+ /*
+ * This can occur after the heartbeat
+ * test hook artificially kept this
+ * site from first trying to connect.
+ */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "handshake from idle site %s:%u EID %u",
+ host, port, eid));
+ break;
default:
DB_ASSERT(env, FALSE);
}
@@ -1834,10 +2426,18 @@ process_parameters(env, conn, host, port, ack, electable, flags)
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"rejecting connection from unknown or provisional site %s:%u",
host, port));
- reject.version = db_rep->membership_version;
- reject.gen = db_rep->member_version_gen;
- __repmgr_connect_reject_marshal(env,
- &reject, reject_buf);
+ if (conn->version < 5) {
+ v4reject.version = db_rep->membership_version;
+ v4reject.gen = db_rep->member_version_gen;
+ __repmgr_v4connect_reject_marshal(env,
+ &v4reject, reject_buf);
+ } else {
+ reject.version = db_rep->membership_version;
+ reject.gen = db_rep->member_version_gen;
+ reject.status = (site) ? site->membership : 0;
+ __repmgr_connect_reject_marshal(env,
+ &reject, reject_buf);
+ }
if ((ret = __repmgr_send_own_msg(env, conn,
REPMGR_CONNECT_REJECT, reject_buf,
@@ -1867,7 +2467,8 @@ process_parameters(env, conn, host, port, ack, electable, flags)
*/
if (!IS_SUBORDINATE(db_rep) && /* us */
!__repmgr_master_is_known(env) &&
- !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
+ !LF_ISSET(REPMGR_SUBORDINATE) && /* the remote site */
+ !IS_PREFMAS_MODE(env)) {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"handshake with no known master to wake election thread"));
db_rep->new_connection = TRUE;
@@ -1980,6 +2581,7 @@ record_permlsn(env, conn)
*/
if (ackp->lsn.file > site->max_ack.file)
do_log_check = 1;
+ site->max_ack_gen = ackp->generation;
memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
if (do_log_check)
check_min_log_file(env);