diff options
| author | Lorry Tar Creator <lorry-tar-importer@baserock.org> | 2015-02-17 17:25:57 +0000 |
|---|---|---|
| committer | <> | 2015-03-17 16:26:24 +0000 |
| commit | 780b92ada9afcf1d58085a83a0b9e6bc982203d1 (patch) | |
| tree | 598f8b9fa431b228d29897e798de4ac0c1d3d970 /src/repmgr/repmgr_sel.c | |
| parent | 7a2660ba9cc2dc03a69ddfcfd95369395cc87444 (diff) | |
| download | berkeleydb-master.tar.gz | |
Diffstat (limited to 'src/repmgr/repmgr_sel.c')
| -rw-r--r-- | src/repmgr/repmgr_sel.c | 726 |
1 files changed, 664 insertions, 62 deletions
diff --git a/src/repmgr/repmgr_sel.c b/src/repmgr/repmgr_sel.c index ba14368f..c32dad25 100644 --- a/src/repmgr/repmgr_sel.c +++ b/src/repmgr/repmgr_sel.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -12,7 +12,7 @@ typedef int (*HEARTBEAT_ACTION) __P((ENV *)); -static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); +static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *, int *)); static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); static void check_min_log_file __P((ENV *)); static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *)); @@ -23,13 +23,18 @@ static int process_parameters __P((ENV *, static int read_version_response __P((ENV *, REPMGR_CONNECTION *)); static int record_permlsn __P((ENV *, REPMGR_CONNECTION *)); static int __repmgr_call_election __P((ENV *)); +static int __repmgr_check_listener __P((ENV *)); +static int __repmgr_check_master_listener __P((ENV *)); static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *)); static void *__repmgr_connector_thread __P((void *)); static int __repmgr_next_timeout __P((ENV *, db_timespec *, HEARTBEAT_ACTION *)); +static int __repmgr_reset_last_rcvd __P((ENV *)); static int __repmgr_retry_connections __P((ENV *)); static int __repmgr_send_heartbeat __P((ENV *)); -static int __repmgr_try_one __P((ENV *, int)); +static int __repmgr_start_takeover __P((ENV *)); +static void *__repmgr_takeover_thread __P((void *)); +static int __repmgr_try_one __P((ENV *, int, int)); static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *)); static int send_version_response __P((ENV *, REPMGR_CONNECTION *)); @@ -49,17 +54,24 @@ void * __repmgr_select_thread(argsp) void *argsp; { - REPMGR_RUNNABLE *args; ENV *env; + DB_THREAD_INFO *ip; int ret; + REPMGR_RUNNABLE *args; args = argsp; env = args->env; + ip = NULL; + ret = 0; - if ((ret = __repmgr_select_loop(env)) != 0) { + ENV_ENTER_RET(env, ip, ret); + if (ret != 0 || (ret = __repmgr_select_loop(env)) != 0) { __db_err(env, ret, DB_STR("3614", "select loop failed")); + ENV_LEAVE(env, ip); (void)__repmgr_thread_failure(env, ret); } + if (ret == 0) + ENV_LEAVE(env, ip); return (NULL); } @@ -71,12 +83,19 @@ __repmgr_bow_out(env) ENV *env; { DB_REP *db_rep; + REP *rep; int ret; db_rep = env->rep_handle; + rep = db_rep->region; LOCK_MUTEX(db_rep->mutex); ret = __repmgr_stop_threads(env); UNLOCK_MUTEX(db_rep->mutex); + /* + * Reset sites_avail so that it will be calculated correctly if this + * site rejoins the group in the future. + */ + rep->sites_avail = 0; DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL); return (ret); } @@ -187,23 +206,53 @@ __repmgr_compute_timeout(env, timeout) db_rep = env->rep_handle; /* - * There are two factors to consider: are heartbeats in use? and, do we + * There are four factors to consider: are heartbeats in use? do we * have any sites with broken connections that we ought to retry? + * is there a listener process running locally? do we need to call + * an election if no master listener exists? */ have_timeout = __repmgr_next_timeout(env, &t, NULL); /* List items are in order, so we only have to examine the first one. */ if (!TAILQ_EMPTY(&db_rep->retries)) { retry = TAILQ_FIRST(&db_rep->retries); - if (have_timeout) { + if (have_timeout) /* Choose earliest timeout deadline. */ t = timespeccmp(&retry->time, &t, <) ? retry->time : t; - } else { + else { t = retry->time; have_timeout = TRUE; } } + /* Check listener every timeout in subordinate rep-aware process. */ + if (IS_LISTENER_CAND(db_rep)) { + if (!timespecisset(&db_rep->l_listener_chk)) { + __os_gettime(env, &now, 1); + TIMESPEC_ADD_DB_TIMEOUT(&now, db_rep->l_listener_wait); + db_rep->l_listener_chk = now; + } + if (have_timeout) + t = timespeccmp(&db_rep->l_listener_chk, &t, <) ? + db_rep->l_listener_chk : t; + else { + t = db_rep->l_listener_chk; + have_timeout = TRUE; + } + } + + /* Check master listener if needed. */ + if (FLD_ISSET(db_rep->region->config, REP_C_AUTOTAKEOVER) && + timespecisset(&db_rep->m_listener_chk)) { + if (have_timeout) + t = timespeccmp(&db_rep->m_listener_chk, &t, <) ? + db_rep->m_listener_chk : t; + else { + t = db_rep->m_listener_chk; + have_timeout = TRUE; + } + } + if (have_timeout) { __os_gettime(env, &now, 1); if (timespeccmp(&now, &t, >=)) @@ -242,7 +291,17 @@ __repmgr_next_timeout(env, deadline, action) if (rep->master_id == db_rep->self_eid && rep->heartbeat_frequency > 0) { - t = db_rep->last_bcast; + /* + * A temporary master in preferred master mode must send + * regular heartbeats regardless of other activity because + * the preferred master requires a heartbeat to take over as + * master after it has synced with the temporary master. + */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT)) + t = db_rep->last_hbeat; + else + t = db_rep->last_bcast; TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency); my_action = __repmgr_send_heartbeat; } else if ((master = __repmgr_connected_master(env)) != NULL && @@ -301,6 +360,24 @@ __repmgr_send_heartbeat(env) db_rep = env->rep_handle; rep = db_rep->region; + ret = 0; + + /* + * Check test hook preventing heartbeats and connection attempts. + * This is used to create and maintain a dupmaster condition in + * a test until the test hook is rescinded. + */ + DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT); + + /* + * Track last heartbeat for temporary master in preferred master + * mode so that it will send regular heartbeats regardless of + * other activity. + */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) && + rep->master_id == db_rep->self_eid) + __os_gettime(env, &db_rep->last_hbeat, 1); permlsn.generation = rep->gen; if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0) @@ -310,8 +387,11 @@ __repmgr_send_heartbeat(env) control.size = __REPMGR_PERMLSN_SIZE; DB_INIT_DBT(rec, NULL, 0); - return (__repmgr_send_broadcast(env, - REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3)); + ret =__repmgr_send_broadcast(env, + REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3); + +DB_TEST_RECOVERY_LABEL + return (ret); } /* @@ -373,6 +453,8 @@ __repmgr_check_timeouts(env) HEARTBEAT_ACTION action; int ret; + ret = 0; + /* * Figure out the next heartbeat-related thing to be done. Then, if * it's time to do it, do so. @@ -384,7 +466,342 @@ __repmgr_check_timeouts(env) return (ret); } - return (__repmgr_retry_connections(env)); + /* Check the existence of local listener. */ + if ((ret = __repmgr_check_listener(env)) != 0) + return (ret); + + /* Check the existence of master listener. */ + if ((ret = __repmgr_check_master_listener(env)) != 0) + return (ret); + + /* + * Check test hook preventing heartbeats and connection attempts. + * This is used to create and maintain a dupmaster condition in + * a test until the test hook is rescinded. + */ + DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT); + + ret = __repmgr_retry_connections(env); + +DB_TEST_RECOVERY_LABEL + return (ret); +} + +/* + * Check the existence of the listener process on the local site. If one + * does not exist and the current process is a subordinate rep-aware process, + * then start a takeover thread to covert this process to the listener process. + */ +static int +__repmgr_check_listener(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + SITEINFO *sites; + db_timespec t; + int ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; + + /* + * Only subordinate rep-aware process can take over listener role, so + * no need to check listener in listener process or rep unaware process. + */ + if (!IS_LISTENER_CAND(db_rep)) + return (0); + + /* + * If the listener quits due to site removal, no subordinate process + * should take over as listener as the current site is not expected + * to be active in the group. Check the status from the site array + * in the shared region instead of that in the GMDB. We do this + * because the GMDB doesn't apply the change yet when replication + * is stopped on the removed site. + */ + sites = R_ADDR(env->reginfo, rep->siteinfo_off); + if (sites[rep->self_eid].status == SITE_DELETING) + return (0); + + /* + * Check the listener after timeout. If there is no listener, we + * take over. During takeover, we will refresh all connections. + * A subordinate process does not have an up-to-date site list, so sync + * up addresses from the in-memory site array before takeover. + */ + __os_gettime(env, &t, 1); + if (timespeccmp(&t, &db_rep->l_listener_chk, >=)) { + /* Compute the next timeout. */ + TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->l_listener_wait); + db_rep->l_listener_chk = t; + + /* Check if site address information needs to be refreshed. */ + if ((rep->siteinfo_seq > db_rep->siteinfo_seq) && + (ret = __repmgr_sync_siteaddr(env)) != 0) + return (ret); + + if (rep->listener == 0) + ret = __repmgr_start_takeover(env); + } + return (ret); +} + +/* + * Start a thread to take over the listener role in the current subordinate + * process. + */ +static int +__repmgr_start_takeover(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_RUNNABLE *th; + int ret; + + db_rep = env->rep_handle; + th = db_rep->takeover_thread; + if (th == NULL) { + if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), + &th)) != 0) + return (ret); + db_rep->takeover_thread = th; + } else if (th->finished) { + if ((ret = __repmgr_thread_join(th)) != 0) + return (ret); + } else { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "takeover thread still running")); + return (0); + } + th->run = __repmgr_takeover_thread; + if ((ret = __repmgr_thread_start(env, th)) != 0) { + __os_free(env, th); + db_rep->takeover_thread = NULL; + } + return (ret); +} + +/* + * Take over listener role in the current subordinate process. + */ +static void * +__repmgr_takeover_thread(argsp) + void *argsp; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + ENV *env; + REP *rep; + REPMGR_RUNNABLE *th; + int nthreads, ret, save_policy; + + th = argsp; + env = th->env; + db_rep = env->rep_handle; + ip = NULL; + rep = db_rep->region; + ret = 0; + + ENV_ENTER_RET(env, ip, ret); + if (ret != 0) + goto out; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting takeover thread")); + /* + * It is likely that there is an old heartbeat ready to expire + * immediately upon restarting repmgr, leading to an unnecessary + * election. Reset the expiration countdown here to avoid this. + */ + if ((ret = __repmgr_reset_last_rcvd(env)) != 0) + goto out; + /* + * If nthreads is set to be 0 in the current subordinate process, use + * the value in the last listener. The nthreads should be larger than + * 0 in listener. + */ + nthreads = db_rep->config_nthreads == 0 ? (int)rep->listener_nthreads : + db_rep->config_nthreads; + /* + * It is possible that this subordinate process does not have intact + * connections to the other sites. For most ack policies, restarting + * repmgr will wait for acks when it commits its transaction to reload + * the gmdb. Temporarily set the ack policy to NONE for the takeover + * so that it is not delayed waiting for acks that can never come. + */ + save_policy = rep->perm_policy; + rep->perm_policy = DB_REPMGR_ACKS_NONE; + /* + * Restart the repmgr as listener. If DB_REP_IGNORE is returned, + * the current process has become listener. If DB_REP_UNAVAIL is + * returned, the site has been removed from the group and no listener + * should be started. For any other error, if the replication is + * stopped because of the takeover thread, we will notify the + * application. + */ + ret = __repmgr_start_int(env, nthreads, F_ISSET(rep, REP_F_MASTER) ? + DB_REP_MASTER : DB_REP_CLIENT); + if (ret == 0 && !IS_SUBORDINATE(db_rep) && + db_rep->repmgr_status == running) { + STAT(rep->mstat.st_takeovers++); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "finished takeover and became listener")); + } else if (ret != 0 && db_rep->repmgr_status == stopped) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "failed to take over, repmgr was stopped")); + DB_EVENT(env, DB_EVENT_REP_AUTOTAKEOVER_FAILED, NULL); + } else { + /* The current process is not changed to listener. */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "failed to take over")); + } + rep->perm_policy = save_policy; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "takeover thread is exiting")); + ENV_LEAVE(env, ip); +out: th->finished = TRUE; + return (NULL); +} + +/* + * Reset the last_rcvd_timestamp to restart the wait for a heartbeat + * monitor expiration. + */ +static int +__repmgr_reset_last_rcvd(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_SITE *master; + + db_rep = env->rep_handle; + + LOCK_MUTEX(db_rep->mutex); + if ((master = __repmgr_connected_master(env)) != NULL) + __os_gettime(env, &master->last_rcvd_timestamp, 1); + UNLOCK_MUTEX(db_rep->mutex); + return (0); +} + +/* + * Monitor the connection to master listener. When the master listener is + * disconnected and some other master process might take over as listener + * soon, we will delay the election. After the delay if there is still no + * connection from master listener, call an election then. + */ +static int +__repmgr_check_master_listener(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + REPMGR_SITE *master; + db_timespec t; + u_int32_t flags; + int ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; + + /* + * We only check for a master listener if m_listener_chk is set. + * The field is only set when __repmgr_bust_connection() previously + * detected the loss of our connection to the master listener. + * If rep->master_id is invalid, wait until it is ready to check. + */ + if (!FLD_ISSET((db_rep)->region->config, REP_C_AUTOTAKEOVER) || + !timespecisset(&db_rep->m_listener_chk) || + !IS_VALID_EID(rep->master_id)) + return (0); + + __os_gettime(env, &t, 1); + if (timespeccmp(&t, &db_rep->m_listener_chk, >=)) { + master = SITE_FROM_EID(db_rep->region->master_id); + if (master->ref.conn.out == NULL && + master->ref.conn.in == NULL) { + flags = ELECT_F_EVENT_NOTIFY; + if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS)) + LF_SET(ELECT_F_IMMED | ELECT_F_FAST); + else + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Master failure, but no elections")); + + /* + * In preferred master mode, a client that has lost its + * connection to the master uses an election thread to + * restart as master. + */ + if (IS_PREFMAS_MODE(env)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, +"check_master_listener setting preferred master temp master")); + db_rep->prefmas_pending = start_temp_master; + } + + ret = __repmgr_init_election(env, flags); + } + /* + * If the delay has expired reset m_listener_chk. We reset + * it whether or not the master listener process comes back + * so that we will not continue checking for a master listener + * indefinitely. + */ + timespecclear(&db_rep->m_listener_chk); + } + return (ret); +} + +/* + * Wake up I/O waiting in selector thread, refresh connections to all connected + * and present sites. + * + * PUBLIC: int __repmgr_refresh_selector __P((ENV *)); + */ +int +__repmgr_refresh_selector(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + REPMGR_RETRY *retry; + REPMGR_SITE *site; + SITEINFO *sites; + int eid, ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + + if ((ret = __repmgr_wake_main_thread(env)) != 0) + return (ret); + + FOR_EACH_REMOTE_SITE_INDEX(eid) { + SET_LISTENER_CAND(1, = 0); + site = SITE_FROM_EID(eid); + + /* + * It is possible some sites were left in a paused state + * during the switch, so they have to be removed from the + * retry list. + */ + if (site->state == SITE_PAUSING) { + retry = site->ref.retry; + if (retry != NULL) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Removing site from retry list eid %lu", + (u_long)eid)); + TAILQ_REMOVE(&db_rep->retries, retry, entries); + __os_free(env, retry); + site->ref.retry = NULL; + } + + } + /* + * Try to connect to any site that is now PRESENT after + * rereading the gmdb. + */ + if (site->membership == SITE_PRESENT && + (ret = __repmgr_try_one(env, eid, TRUE)) != 0) + return (ret); + } + return (0); } /* @@ -415,10 +832,11 @@ __repmgr_retry_connections(env) __os_free(env, retry); DB_ASSERT(env, IS_VALID_EID(eid)); site = SITE_FROM_EID(eid); + site->ref.retry = NULL; DB_ASSERT(env, site->state == SITE_PAUSING); if (site->membership == SITE_PRESENT) { - if ((ret = __repmgr_try_one(env, eid)) != 0) + if ((ret = __repmgr_try_one(env, eid, FALSE)) != 0) return (ret); } else site->state = SITE_IDLE; @@ -437,11 +855,23 @@ __repmgr_first_try_connections(env) ENV *env; { DB_REP *db_rep; + REP *rep; REPMGR_SITE *site; + SITEINFO *sites; int eid, ret; db_rep = env->rep_handle; + rep = db_rep->region; + + /* + * Check test hook preventing heartbeats and connection attempts. + * This is used to create and maintain a dupmaster condition in + * a test until the test hook is rescinded. + */ + DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT); + FOR_EACH_REMOTE_SITE_INDEX(eid) { + SET_LISTENER_CAND(1, = 0); site = SITE_FROM_EID(eid); /* * Normally all sites would be IDLE here. But if a user thread @@ -453,19 +883,22 @@ __repmgr_first_try_connections(env) */ if (site->state == SITE_IDLE && site->membership == SITE_PRESENT && - (ret = __repmgr_try_one(env, eid)) != 0) + (ret = __repmgr_try_one(env, eid, FALSE)) != 0) return (ret); } +DB_TEST_RECOVERY_LABEL return (0); } /* - * Starts a thread to open a connection to the site at the given EID. + * Starts a thread to open a connection to the site at the given EID. We might + * have no connection to the site, or an existing connection to be replaced. */ static int -__repmgr_try_one(env, eid) +__repmgr_try_one(env, eid, refresh) ENV *env; int eid; + int refresh; { DB_REP *db_rep; REPMGR_SITE *site; @@ -488,13 +921,22 @@ __repmgr_try_one(env, eid) "eid %lu previous connector thread still running; will retry", (u_long)eid)); return (__repmgr_schedule_connection_attempt(env, - eid, FALSE)); + eid, refresh)); } site->state = SITE_CONNECTING; th->run = __repmgr_connector_thread; - th->args.eid = eid; + th->args.conn_th.eid = eid; + /* + * The flag CONNECT_F_REFRESH indicates an immediate connection attempt + * should be scheduled if the current connection attempt fails. It is + * turned on before the first attempt to refresh the connection but + * turned off if the first attempt fails. In this way, when refreshing + * the connection, there will be at most two immediate connection + * attempts, after that, retry as usual. + */ + th->args.conn_th.flags = refresh ? CONNECT_F_REFRESH : 0; if ((ret = __repmgr_thread_start(env, th)) != 0) { __os_free(env, th); site->connector = NULL; @@ -506,21 +948,33 @@ static void * __repmgr_connector_thread(argsp) void *argsp; { - REPMGR_RUNNABLE *th; ENV *env; + DB_THREAD_INFO *ip; + REPMGR_RUNNABLE *th; int ret; th = argsp; env = th->env; + ip = NULL; + ret = 0; - RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "starting connector thread, eid %u", th->args.eid)); - if ((ret = __repmgr_connector_main(env, th)) != 0) { + ENV_ENTER_RET(env, ip, ret); + if (ret == 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "starting connector thread, eid %u", + th->args.conn_th.eid)); + if (ret != 0 || (ret = __repmgr_connector_main(env, th)) != 0) { __db_err(env, ret, DB_STR("3617", "connector thread failed")); + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "connector thread is exiting")); + ENV_LEAVE(env, ip); (void)__repmgr_thread_failure(env, ret); } - RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting")); - + if (ret == 0) { + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "connector thread is exiting")); + ENV_LEAVE(env, ip); + } th->finished = TRUE; return (NULL); } @@ -542,8 +996,8 @@ __repmgr_connector_main(env, th) ret = 0; LOCK_MUTEX(db_rep->mutex); - DB_ASSERT(env, IS_VALID_EID(th->args.eid)); - site = SITE_FROM_EID(th->args.eid); + DB_ASSERT(env, IS_VALID_EID(th->args.conn_th.eid)); + site = SITE_FROM_EID(th->args.conn_th.eid); if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped) goto unlock; @@ -563,7 +1017,8 @@ __repmgr_connector_main(env, th) UNLOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) { - DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid); + DB_EVENT(env, + DB_EVENT_REP_CONNECT_ESTD, &th->args.conn_th.eid); LOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) { __db_err(env, ret, DB_STR("3618", @@ -571,33 +1026,53 @@ __repmgr_connector_main(env, th) goto cleanup; } conn->type = REP_CONNECTION; - site = SITE_FROM_EID(th->args.eid); + site = SITE_FROM_EID(th->args.conn_th.eid); if (site->state != SITE_CONNECTING || db_rep->repmgr_status == stopped) goto cleanup; - conn->eid = th->args.eid; - site = SITE_FROM_EID(th->args.eid); - site->ref.conn.out = conn; + conn->eid = th->args.conn_th.eid; + site = SITE_FROM_EID(th->args.conn_th.eid); + /* + * If there is an existing outgoing connection, disable it and + * replace it with a new connection. The sites for a formerly + * subordinate handle that is now taking over might still be + * SITE_CONNECTING. Set to SITE_CONNECTED before disabling + * connection so that sites_avail is correctly maintained. + */ site->state = SITE_CONNECTED; + if (site->ref.conn.out != NULL) + (void)__repmgr_disable_connection(env, + site->ref.conn.out); + site->ref.conn.out = conn; __os_gettime(env, &site->last_rcvd_timestamp, 1); ret = __repmgr_wake_main_thread(env); } else if (ret == DB_REP_UNAVAIL) { /* Retryable error while trying to connect: retry later. */ - info.eid = th->args.eid; + info.eid = th->args.conn_th.eid; info.error = err; DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info); STAT(db_rep->region->mstat.st_connect_fail++); LOCK_MUTEX(db_rep->mutex); - site = SITE_FROM_EID(th->args.eid); + site = SITE_FROM_EID(th->args.conn_th.eid); if (site->state != SITE_CONNECTING || db_rep->repmgr_status == stopped) { ret = 0; goto unlock; } + /* + * If it fails to create a new outgoing connection to replace + * the existing one in the first attempt, schedule another + * immediate attempt. If it is our second attempt, disable + * the existing connections and retry as normal. + */ + if (site->ref.conn.out != NULL && th->args.conn_th.flags == 0) + (void)__repmgr_disable_connection(env, + site->ref.conn.out); ret = __repmgr_schedule_connection_attempt(env, - th->args.eid, FALSE); + th->args.conn_th.eid, + th->args.conn_th.flags == CONNECT_F_REFRESH); } else goto out; @@ -842,6 +1317,7 @@ prepare_input(env, conn) if ((ret = __os_malloc(env, memsize, &membase)) != 0) return (ret); conn->input.rep_message = membase; + conn->input.rep_message->size = memsize; conn->input.rep_message->msg_hdr = msg_hdr; conn->input.rep_message->v.repmsg.originating_eid = conn->eid; @@ -876,6 +1352,7 @@ prepare_input(env, conn) if ((ret = __os_malloc(env, memsize, &membase)) != 0) return (ret); conn->input.rep_message = membase; + conn->input.rep_message->size = memsize; conn->input.rep_message->msg_hdr = msg_hdr; conn->input.rep_message->v.appmsg.conn = conn; @@ -891,6 +1368,7 @@ prepare_input(env, conn) if ((ret = __os_malloc(env, size, &membase)) != 0) return (ret); conn->input.rep_message = membase; + conn->input.rep_message->size = size; conn->input.rep_message->msg_hdr = msg_hdr; /* @@ -1065,16 +1543,18 @@ dispatch_msgin(env, conn) ENV *env; REPMGR_CONNECTION *conn; { + DBT *dbt; DB_REP *db_rep; - REPMGR_SITE *site; - REPMGR_RUNNABLE *th; + REP *rep; REPMGR_RESPONSE *resp; - DBT *dbt; + REPMGR_RUNNABLE *th; + REPMGR_SITE *site; char *hostname; - int eid, ret; + int eid, ret, subord; DB_ASSERT(env, conn->reading_phase == DATA_PHASE); db_rep = env->rep_handle; + rep = db_rep->region; switch (conn->state) { case CONN_CONNECTED: @@ -1129,9 +1609,22 @@ dispatch_msgin(env, conn) dbt = &conn->input.repmgr_msg.rec; hostname = dbt->data; hostname[dbt->size-1] = '\0'; - if ((ret = accept_handshake(env, conn, hostname)) != 0) + if ((ret = accept_handshake(env, + conn, hostname, &subord)) != 0) return (ret); conn->state = CONN_READY; + site = SITE_FROM_EID(conn->eid); + /* + * Do not increase sites_avail redundantly for an + * incoming subordinate connection. + */ + if (conn->type == REP_CONNECTION && + site->state == SITE_CONNECTED && !subord) { + rep->sites_avail++; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "msgin: EID %lu CONNECTED, READY. sites_avail %lu", + (u_long)conn->eid, (u_long)rep->sites_avail)); + } break; case REPMGR_OWN_MSG: /* @@ -1279,9 +1772,11 @@ process_own_msg(env, conn) REPMGR_SITE *site; REPMGR_MESSAGE *msg; __repmgr_connect_reject_args reject; + __repmgr_v4connect_reject_args v4reject; __repmgr_parm_refresh_args parms; int ret; + db_rep = env->rep_handle; ret = 0; /* * Set "msg" to point to the message struct. If we do all necessary @@ -1293,28 +1788,61 @@ process_own_msg(env, conn) switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) { case REPMGR_CONNECT_REJECT: dbt = &msg->v.gmdb_msg.request; - if ((ret = __repmgr_connect_reject_unmarshal(env, - &reject, dbt->data, dbt->size, NULL)) != 0) - return (DB_REP_UNAVAIL); + if (conn->version < 5) { + if ((ret = __repmgr_v4connect_reject_unmarshal(env, + &v4reject, dbt->data, dbt->size, NULL)) != 0) + return (DB_REP_UNAVAIL); + reject.version = v4reject.version; + reject.gen = v4reject.gen; + reject.status = 0; + } else { + if ((ret = __repmgr_connect_reject_unmarshal(env, + &reject, dbt->data, dbt->size, NULL)) != 0) + return (DB_REP_UNAVAIL); + } /* * If we're being rejected by someone who has more up-to-date - * membership information than we do, it means we have been - * removed from the group. If we've just gotten started, we can - * make one attempt at automatically rejoining; otherwise we bow - * out gracefully. + * membership information than we do, it means we are not in + * the group. If we've just gotten started, or our status is + * adding, we can make one attempt at automatically rejoining; + * otherwise we bow out gracefully. */ RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "got rejection msg citing version %lu/%lu", - (u_long)reject.gen, (u_long)reject.version)); + "got rejection msg citing version %lu/%lu mine %lu/%lu membership %lu", + (u_long)reject.gen, (u_long)reject.version, + (u_long)db_rep->member_version_gen, + (u_long)db_rep->membership_version, + (u_long)reject.status)); if (__repmgr_gmdb_version_cmp(env, reject.gen, reject.version) > 0) { - if (env->rep_handle->seen_repmsg) + if (db_rep->seen_repmsg && reject.status != SITE_ADDING) ret = DB_DELETED; - else if ((ret = __repmgr_defer_op(env, - REPMGR_REJOIN)) == 0) - ret = DB_REP_UNAVAIL; + else { + /* + * If 2SITE_STRICT is off, we are likely to + * win an election with our own vote before + * discovering there is already a master. + * Set indicator to defer the election until + * after rejoining group. + * + * In preferred master mode, either site + * should defer the election (which + * executes the preferred master startup + * code and only calls an election if it is + * safe) and also avoid scheduling an extra + * reconnect attempt in bust_connection() + * by setting the indicator. + */ + if (!FLD_ISSET(db_rep->region->config, + REP_C_2SITE_STRICT) || + IS_PREFMAS_MODE(env)) + db_rep->rejoin_pending = TRUE; + if ((ret = __repmgr_defer_op(env, + REPMGR_REJOIN)) == 0) + ret = DB_REP_UNAVAIL; + } } else ret = DB_REP_UNAVAIL; DB_ASSERT(env, ret != 0); @@ -1332,7 +1860,6 @@ process_own_msg(env, conn) if ((ret = __repmgr_parm_refresh_unmarshal(env, &parms, dbt->data, dbt->size, NULL)) != 0) return (DB_REP_UNAVAIL); - db_rep = env->rep_handle; DB_ASSERT(env, conn->type == REP_CONNECTION && IS_KNOWN_REMOTE_SITE(conn->eid)); site = SITE_FROM_EID(conn->eid); @@ -1348,8 +1875,15 @@ process_own_msg(env, conn) case REPMGR_GM_FORWARD: case REPMGR_JOIN_REQUEST: case REPMGR_JOIN_SUCCESS: + case REPMGR_LSNHIST_REQUEST: + case REPMGR_LSNHIST_RESPONSE: + case REPMGR_PREFMAS_FAILURE: + case REPMGR_PREFMAS_SUCCESS: + case REPMGR_READONLY_MASTER: + case REPMGR_READONLY_RESPONSE: case REPMGR_REMOVE_REQUEST: case REPMGR_RESOLVE_LIMBO: + case REPMGR_RESTART_CLIENT: default: __db_errx(env, DB_STR_A("3677", "unexpected msg type %lu in process_own_msg", "%lu"), @@ -1482,6 +2016,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags) cntrl_len = __REPMGR_V3HANDSHAKE_SIZE; break; case 4: + case 5: + case 6: cntrl_len = __REPMGR_HANDSHAKE_SIZE; break; default: @@ -1513,6 +2049,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags) __repmgr_v3handshake_marshal(env, &v3hs, p); break; case 4: + case 5: + case 6: hs.port = my_addr->port; hs.alignment = MEM_ALIGN; hs.ack_policy = (u_int32_t)rep->perm_policy; @@ -1551,11 +2089,14 @@ read_version_response(env, conn) DB_REP *db_rep; __repmgr_version_confirmation_args conf; DBT vi; + REP *rep; + REPMGR_SITE *site; char *hostname; u_int32_t flags; - int ret; + int ret, subord; db_rep = env->rep_handle; + rep = db_rep->region; if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0) return (ret); @@ -1581,14 +2122,37 @@ read_version_response(env, conn) return (DB_REP_UNAVAIL); } - if ((ret = accept_handshake(env, conn, hostname)) != 0) + if ((ret = accept_handshake(env, conn, hostname, &subord)) != 0) return (ret); - flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0; + if (!IS_SUBORDINATE(db_rep)) + flags = 0; + else { + flags = REPMGR_SUBORDINATE; + if (FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER) && + db_rep->repmgr_status == running) + /* + * Takeover is enabled in rep-aware subordinate + * process. + */ + flags |= REPMGR_AUTOTAKEOVER; + } if ((ret = __repmgr_send_handshake(env, conn, NULL, 0, flags)) != 0) return (ret); } conn->state = CONN_READY; + site = SITE_FROM_EID(conn->eid); + /* + * Do not increase sites_avail redundantly for a new outgoing + * connection from a subordinate process. + */ + if (conn->type == REP_CONNECTION && + site->state == SITE_CONNECTED && !IS_SUBORDINATE(db_rep)) { + rep->sites_avail++; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "vers_resp: EID %lu CONNECTED, READY. sites_avail %lu", + (u_long)conn->eid, (u_long)rep->sites_avail)); + } return (ret); } @@ -1641,10 +2205,11 @@ __repmgr_find_version_info(env, conn, vi) } static int -accept_handshake(env, conn, hostname) +accept_handshake(env, conn, hostname, subordinate) ENV *env; REPMGR_CONNECTION *conn; char *hostname; + int *subordinate; { __repmgr_handshake_args hs; __repmgr_v2handshake_args hs2; @@ -1653,6 +2218,7 @@ accept_handshake(env, conn, hostname) u_int32_t ack, flags; int electable; + *subordinate = 0; switch (conn->version) { case 2: if (__repmgr_v2handshake_unmarshal(env, &hs2, @@ -1674,6 +2240,8 @@ accept_handshake(env, conn, hostname) ack = 0; break; case 4: + case 5: + case 6: if (__repmgr_handshake_unmarshal(env, &hs, conn->input.repmgr_msg.cntrl.data, conn->input.repmgr_msg.cntrl.size, NULL) != 0) @@ -1682,6 +2250,8 @@ accept_handshake(env, conn, hostname) electable = F_ISSET(&hs, ELECTABLE_SITE); flags = hs.flags; ack = hs.ack_policy; + if (LF_ISSET(REPMGR_SUBORDINATE)) + *subordinate = 1; break; default: __db_errx(env, DB_STR_A("3679", @@ -1729,13 +2299,17 @@ process_parameters(env, conn, host, port, ack, electable, flags) u_int32_t ack, flags; { DB_REP *db_rep; + REP *rep; REPMGR_RETRY *retry; REPMGR_SITE *site; + SITEINFO *sites; __repmgr_connect_reject_args reject; + __repmgr_v4connect_reject_args v4reject; u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE]; int eid, ret; db_rep = env->rep_handle; + rep = db_rep->region; /* Connection state can be used to discern incoming versus outgoing. */ if (conn->state == CONN_CONNECTED) { @@ -1785,6 +2359,13 @@ process_parameters(env, conn, host, port, ack, electable, flags) TAILQ_INSERT_TAIL(&site->sub_conns, conn, entries); conn->eid = eid; + conn->auto_takeover = + LF_ISSET(REPMGR_AUTOTAKEOVER) ? 1 : 0; + SET_LISTENER_CAND(conn->auto_takeover, ++); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "handshake from subordinate %sconnection at site %s:%u EID %u", + LF_ISSET(REPMGR_AUTOTAKEOVER)? + "takeover ": "", host, port, eid)); } else { DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &eid); @@ -1797,6 +2378,7 @@ process_parameters(env, conn, host, port, ack, electable, flags) TAILQ_REMOVE(&db_rep->retries, retry, entries); __os_free(env, retry); + site->ref.retry = NULL; break; case SITE_CONNECTED: /* @@ -1821,6 +2403,16 @@ process_parameters(env, conn, host, port, ack, electable, flags) * don't have to do anything else here. */ break; + case SITE_IDLE: + /* + * This can occur after the heartbeat + * test hook artificially kept this + * site from first trying to connect. + */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "handshake from idle site %s:%u EID %u", + host, port, eid)); + break; default: DB_ASSERT(env, FALSE); } @@ -1834,10 +2426,18 @@ process_parameters(env, conn, host, port, ack, electable, flags) RPRINT(env, (env, DB_VERB_REPMGR_MISC, "rejecting connection from unknown or provisional site %s:%u", host, port)); - reject.version = db_rep->membership_version; - reject.gen = db_rep->member_version_gen; - __repmgr_connect_reject_marshal(env, - &reject, reject_buf); + if (conn->version < 5) { + v4reject.version = db_rep->membership_version; + v4reject.gen = db_rep->member_version_gen; + __repmgr_v4connect_reject_marshal(env, + &v4reject, reject_buf); + } else { + reject.version = db_rep->membership_version; + reject.gen = db_rep->member_version_gen; + reject.status = (site) ? site->membership : 0; + __repmgr_connect_reject_marshal(env, + &reject, reject_buf); + } if ((ret = __repmgr_send_own_msg(env, conn, REPMGR_CONNECT_REJECT, reject_buf, @@ -1867,7 +2467,8 @@ process_parameters(env, conn, host, port, ack, electable, flags) */ if (!IS_SUBORDINATE(db_rep) && /* us */ !__repmgr_master_is_known(env) && - !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */ + !LF_ISSET(REPMGR_SUBORDINATE) && /* the remote site */ + !IS_PREFMAS_MODE(env)) { RPRINT(env, (env, DB_VERB_REPMGR_MISC, "handshake with no known master to wake election thread")); db_rep->new_connection = TRUE; @@ -1980,6 +2581,7 @@ record_permlsn(env, conn) */ if (ackp->lsn.file > site->max_ack.file) do_log_check = 1; + site->max_ack_gen = ackp->generation; memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN)); if (do_log_check) check_min_log_file(env); |
