diff options
Diffstat (limited to 'src/repmgr/repmgr_method.c')
| -rw-r--r-- | src/repmgr/repmgr_method.c | 954 |
1 files changed, 706 insertions, 248 deletions
diff --git a/src/repmgr/repmgr_method.c b/src/repmgr/repmgr_method.c index 229cf650..729ba5ff 100644 --- a/src/repmgr/repmgr_method.c +++ b/src/repmgr/repmgr_method.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -29,19 +29,17 @@ static int get_channel_connection __P((CHANNEL *, REPMGR_CONNECTION **)); static int init_dbsite __P((ENV *, int, const char *, u_int, DB_SITE **)); static int join_group_at_site __P((ENV *, repmgr_netaddr_t *)); static int kick_blockers __P((ENV *, REPMGR_CONNECTION *, void *)); -static int make_request_conn __P((ENV *, - repmgr_netaddr_t *, REPMGR_CONNECTION **)); static int set_local_site __P((DB_SITE *, u_int32_t)); -static int read_own_msg __P((ENV *, - REPMGR_CONNECTION *, u_int32_t *, u_int8_t **, size_t *)); static int refresh_site __P((DB_SITE *)); static int __repmgr_await_threads __P((ENV *)); static int __repmgr_build_data_out __P((ENV *, DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp)); static int __repmgr_build_msg_out __P((ENV *, DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp)); +static int __repmgr_demote_site(ENV *, int); static int repmgr_only __P((ENV *, const char *)); static int __repmgr_restart __P((ENV *, int, u_int32_t)); +static int __repmgr_remove_and_close_site __P((DB_SITE *)); static int __repmgr_remove_site __P((DB_SITE *)); static int __repmgr_remove_site_pp __P((DB_SITE *)); static int __repmgr_start_msg_threads __P((ENV *, u_int)); @@ -52,25 +50,21 @@ static int send_msg_self __P((ENV *, REPMGR_IOVECS *, u_int32_t)); static int site_by_addr __P((ENV *, const char *, u_int, DB_SITE **)); /* - * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); + * PUBLIC: int __repmgr_start_pp __P((DB_ENV *, int, u_int32_t)); */ int -__repmgr_start(dbenv, nthreads, flags) +__repmgr_start_pp(dbenv, nthreads, flags) DB_ENV *dbenv; int nthreads; u_int32_t flags; { DB_REP *db_rep; - REP *rep; - REPMGR_SITE *me, *site; - DB_THREAD_INFO *ip; ENV *env; - int first, is_listener, locked, min, need_masterseek, ret, start_master; - u_int i, n; + DB_THREAD_INFO *ip; + int ret; env = dbenv->env; db_rep = env->rep_handle; - rep = db_rep->region; switch (flags) { case 0: @@ -102,7 +96,27 @@ __repmgr_start(dbenv, nthreads, flags) return (EINVAL); } - /* Check if it is a shut-down site, if so, clean the resources. */ + /* A view site cannot be started as MASTER or ELECTION. */ + if (IS_VIEW_SITE(env) && + (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) { + __db_errx(env, DB_STR("3694", + "A view site must be started with DB_REP_CLIENT")); + return (EINVAL); + } + + /* Must start site as client in preferred master mode. */ + if (PREFMAS_IS_SET(env) && + (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) { + __db_errx(env, DB_STR("3702", + "A preferred master site must be started with " + "DB_REP_CLIENT")); + return (EINVAL); + } + + /* + * Check if it is a shut-down site, if so, clean the resources and + * reset the status in order to get ready to start replication. + */ if (db_rep->repmgr_status == stopped) { if ((ret = __repmgr_stop(env)) != 0) { __db_errx(env, DB_STR("3638", @@ -112,7 +126,55 @@ __repmgr_start(dbenv, nthreads, flags) db_rep->repmgr_status = ready; } + /* Record the original configurations given by application. */ + ENV_ENTER(env, ip); db_rep->init_policy = flags; + db_rep->config_nthreads = nthreads; + ret = __repmgr_start_int(env, nthreads, flags); + ENV_LEAVE(env, ip); + return (ret); +} + +/* + * Internal processing to start replication manager. + * + * PUBLIC: int __repmgr_start_int __P((ENV *, int, u_int32_t)); + */ +int +__repmgr_start_int(env, nthreads, flags) + ENV *env; + int nthreads; + u_int32_t flags; +{ + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + REP *rep; + REPMGR_SITE *me, *site; + u_int32_t startopts; + int first, flags_error, is_listener, locked, min; + int need_masterseek, ret, start_master; + u_int i, n; + + db_rep = env->rep_handle; + rep = db_rep->region; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + flags_error = 0; + startopts = 0; + + /* + * For preferred master master site startup, we need to save the + * log location at the end of our previous transactions for + * the lsnhist_match comparisons. Starting repmgr adds a few + * more log records that we don't want to count in lsnhist_match. + */ + if (FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) { + LOG_SYSTEM_LOCK(env); + db_rep->prefmas_init_lsn = lp->lsn; + LOG_SYSTEM_UNLOCK(env); + } + if ((ret = __rep_set_transport_int(env, db_rep->self_eid, __repmgr_send)) != 0) return (ret); @@ -128,7 +190,8 @@ __repmgr_start(dbenv, nthreads, flags) if (db_rep->restored_list != NULL) { ret = __repmgr_refresh_membership(env, - db_rep->restored_list, db_rep->restored_list_length); + db_rep->restored_list, db_rep->restored_list_length, + DB_REPMGR_VERSION); __os_free(env, db_rep->restored_list); db_rep->restored_list = NULL; } else { @@ -145,9 +208,15 @@ __repmgr_start(dbenv, nthreads, flags) * join. */ ret = __repmgr_join_group(env); + else if (VIEW_TO_PARTICIPANT(db_rep, me)) { + __db_errx(env, DB_STR("3695", + "A view site must be started with a view callback")); + return (EINVAL); + } } else if (ret == ENOENT) { - ENV_ENTER(env, ip); - if (FLD_ISSET(me->config, DB_GROUP_CREATOR)) + if (FLD_ISSET(me->config, DB_GROUP_CREATOR) || + (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER))) start_master = TRUE; /* * LEGACY is inconsistent with CREATOR, but start_master @@ -166,10 +235,12 @@ __repmgr_start(dbenv, nthreads, flags) continue; if ((ret = __repmgr_set_membership(env, site->net_addr.host, - site->net_addr.port, - SITE_PRESENT)) != 0) + site->net_addr.port, SITE_PRESENT, + site->gmdb_flags)) != 0) break; - n++; + if (!FLD_ISSET(site->gmdb_flags, + SITE_VIEW)) + n++; } ret = __rep_set_nsites_int(env, n); DB_ASSERT(env, ret == 0); @@ -180,30 +251,27 @@ __repmgr_start(dbenv, nthreads, flags) db_rep->member_version_gen = 1; if ((ret = __repmgr_set_membership(env, me->net_addr.host, me->net_addr.port, - SITE_PRESENT)) == 0) { + SITE_PRESENT, 0)) == 0) { ret = __rep_set_nsites_int(env, 1); DB_ASSERT(env, ret == 0); } UNLOCK_MUTEX(db_rep->mutex); } else ret = __repmgr_join_group(env); - ENV_LEAVE(env, ip); } else if (ret == DB_DELETED) ret = DB_REP_UNAVAIL; } if (ret != 0) return (ret); - DB_ASSERT(env, start_master || - SITE_FROM_EID(db_rep->self_eid)->membership == SITE_PRESENT); - /* - * If we're the first repmgr_start() call, we will have to start threads. - * Therefore, we require a flags value (to tell us how). + * Catch case where user defines a different local site address than + * the one in the restored_list from an ongoing internal init. */ - if (db_rep->repmgr_status != running && flags == 0) { - __db_errx(env, DB_STR("3639", - "a non-zero flags value is required for initial repmgr_start() call")); + if (!start_master && + SITE_FROM_EID(db_rep->self_eid)->membership != SITE_PRESENT) { + __db_errx(env, DB_STR("3696", + "Current local site conflicts with earlier definition")); return (EINVAL); } @@ -214,37 +282,54 @@ __repmgr_start(dbenv, nthreads, flags) * * Then, in case there could be multiple processes, we're either the * main listener process or a subordinate process. On a "subsequent" - * repmgr_start() call we already have enough information to know which - * it is. Otherwise, negotiate with information in the shared region to - * claim the listener role if possible. + * repmgr_start() call, with a running main listener process, we already + * have enough information to know which it is. Otherwise, if there is + * no listener, negotiate with information in the shared region to claim + * the listener role if possible. Once we decide we're the listener, + * mark the listener id in the shared region, so that no other process + * thinks the same thing. * * To avoid a race, once we decide we're in the first call, mark the * handle as started, so that no other thread thinks the same thing. */ + first = FALSE; + is_listener = FALSE; LOCK_MUTEX(db_rep->mutex); locked = TRUE; - if (db_rep->repmgr_status == running) { - first = FALSE; + if (db_rep->repmgr_status == running && !(rep->listener == 0 && + FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER))) is_listener = !IS_SUBORDINATE(db_rep); - } else { + else if (db_rep->repmgr_status != running && + rep->listener == 0 && flags == 0) + flags_error = 1; + else { first = TRUE; db_rep->repmgr_status = running; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); if (rep->listener == 0) { is_listener = TRUE; - __os_id(dbenv, &rep->listener, NULL); - } else { - is_listener = FALSE; + __os_id(env->dbenv, &rep->listener, NULL); + } else nthreads = 0; - } MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); } UNLOCK_MUTEX(db_rep->mutex); locked = FALSE; + /* + * The first repmgr_start() call for the main listener process + * requires a flags value to tell us how to start up the site. + * But we don't require a flags value for the repmgr_start() + * call for a subordinate process because the site is already + * started and we would only ignore the value anyway. + */ + if (flags_error) { + __db_errx(env, DB_STR("3639", + "A non-zero flags value is required for initial repmgr_start() call")); + return (EINVAL); + } + if (!first) { /* * Subsequent call is allowed when ELECTIONS are turned off, so @@ -266,7 +351,7 @@ __repmgr_start(dbenv, nthreads, flags) /* * The minimum legal number of threads is either 1 or 0, depending upon - * whether we're the main process or a subordinate. + * whether we're the listener process or a subordinate. */ min = is_listener ? 1 : 0; if (nthreads < min) { @@ -303,14 +388,24 @@ __repmgr_start(dbenv, nthreads, flags) * of rep_start calls even within an env region lifetime. */ if (start_master) { - ret = __repmgr_become_master(env); + ret = __repmgr_become_master(env, 0); /* No other repmgr threads running yet. */ DB_ASSERT(env, ret != DB_REP_UNAVAIL); if (ret != 0) goto err; need_masterseek = FALSE; } else { - if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0) + /* + * The preferred master site cannot allow its gen + * to change until it has done its lsnhist_match to + * guarantee that no preferred master transactions + * will be rolled back. + */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) + startopts = REP_START_HOLD_CLIGEN; + if ((ret = __repmgr_repstart(env, + DB_REP_CLIENT, startopts)) != 0) goto err; /* * The repmgr election code starts elections only if @@ -352,6 +447,7 @@ __repmgr_start(dbenv, nthreads, flags) if ((ret = __repmgr_start_msg_threads(env, (u_int)nthreads)) != 0) goto err; + rep->listener_nthreads = (u_int)nthreads; if (need_masterseek) { /* @@ -374,10 +470,47 @@ __repmgr_start(dbenv, nthreads, flags) } UNLOCK_MUTEX(db_rep->mutex); locked = FALSE; + /* + * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only + * do this for the main listener process. For a subordinate + * process, it is always turned on. + */ + rep->inqueue_full_event_on = 1; + } + if (db_rep->selector == NULL) { + /* All processes (even non-listeners) need a select() thread. */ + if ((ret = __repmgr_start_selector(env)) == 0) { + /* + * A view callback is set but this site isn't yet a + * view in the internal site list. Do the view + * demotion here, which will update the internal + * site list. We need the select() thread for the + * demotion because the demotion performs gmdb + * operations. + */ + if (PARTICIPANT_TO_VIEW(db_rep, + SITE_FROM_EID(db_rep->self_eid)) && + (ret = __repmgr_demote_site(env, + db_rep->self_eid)) != 0) + goto err; + return (is_listener ? 0 : DB_REP_IGNORE); + } + } else { + /* + * If the selector thread already exists, the current process + * should be the new listener which has just finished a + * takeover. Now, all active connections need to be refreshed + * to notify remote sites about the new listener. If a new + * connection is established immediately, disable the existing + * main connection to the same site. Otherwise, schedule a + * second immediate attempt. If it still fails, disable the + * main connection and retry a connection as usual. + */ + DB_ASSERT(env, is_listener && + FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER)); + if ((ret = __repmgr_refresh_selector(env)) == 0) + return (0); } - /* All processes (even non-listeners) need a select() thread. */ - if ((ret = __repmgr_start_selector(env)) == 0) - return (is_listener ? 0 : DB_REP_IGNORE); err: /* If we couldn't succeed at everything, undo the parts we did do. */ @@ -392,6 +525,16 @@ err: if (!locked) LOCK_MUTEX(db_rep->mutex); (void)__repmgr_net_close(env); + /* Reset the listener when we fail before having a valid listen_fd. */ + if (first && is_listener) + rep->listener = 0; + /* + * Reset repmgr_status when we fail before starting a selector if the + * earlier call to __repmgr_stop_threads() hasn't already reset it to + * stopped. + */ + if (db_rep->repmgr_status == running) + db_rep->repmgr_status = ready; UNLOCK_MUTEX(db_rep->mutex); return (ret); } @@ -425,6 +568,53 @@ __repmgr_valid_config(env, flags) } /* + * Set priority, heartbeat and election_retry timeouts for preferred master + * mode. Turn on 2SITE_STRICT and ELECTIONS. Can be called whether or not + * REP_ON() is true + * + * PUBLIC: int __repmgr_prefmas_auto_config __P((DB_ENV *, u_int32_t *)); + */ +int __repmgr_prefmas_auto_config (dbenv, config_flags) + DB_ENV *dbenv; + u_int32_t *config_flags; +{ + ENV * env; + db_timeout_t timeout; + int ret; + + env = dbenv->env; + timeout = 0; + + /* Change heartbeat timeouts if they are not already set. */ + if ((ret = __rep_get_timeout(dbenv, + DB_REP_HEARTBEAT_MONITOR, &timeout)) == 0 && + timeout == 0 && (ret = __rep_set_timeout_int(env, + DB_REP_HEARTBEAT_MONITOR, + DB_REPMGR_PREFMAS_HEARTBEAT_MONITOR)) != 0) + return (ret); + if ((ret = __rep_get_timeout(dbenv, + DB_REP_HEARTBEAT_SEND, &timeout)) == 0 && + timeout == 0 && (ret = __rep_set_timeout_int(env, + DB_REP_HEARTBEAT_SEND, DB_REPMGR_PREFMAS_HEARTBEAT_SEND)) != 0) + return (ret); + + /* Change election_retry timeout if it is still the default value. */ + if ((ret = __rep_get_timeout(dbenv, + DB_REP_ELECTION_RETRY, &timeout)) == 0 && + timeout == DB_REPMGR_DEFAULT_ELECTION_RETRY && + (ret = __rep_set_timeout_int(env, + DB_REP_ELECTION_RETRY, DB_REPMGR_PREFMAS_ELECTION_RETRY)) != 0) + return (ret); + + if ((ret = __rep_set_priority_int(env, FLD_ISSET(*config_flags, + REP_C_PREFMAS_MASTER) ? DB_REPMGR_PREFMAS_PRIORITY_MASTER : + DB_REPMGR_PREFMAS_PRIORITY_CLIENT)) != 0) + return (ret); + FLD_SET(*config_flags, REP_C_ELECTIONS | REP_C_2SITE_STRICT); + return (0); +} + +/* * Starts message processing threads. On entry, the actual number of threads * already active is db_rep->nthreads; the desired number of threads is passed * as "n". @@ -473,7 +663,7 @@ __repmgr_restart(env, nthreads, flags) REP *rep; REPMGR_RUNNABLE **th; u_int32_t cur_repflags; - int locked, ret, t_ret; + int locked, ret, role_change, t_ret; u_int delta, i, min, nth; th = NULL; @@ -491,6 +681,7 @@ __repmgr_restart(env, nthreads, flags) } ret = 0; + role_change = 0; db_rep = env->rep_handle; DB_ASSERT(env, REP_ON(env)); rep = db_rep->region; @@ -498,11 +689,14 @@ __repmgr_restart(env, nthreads, flags) cur_repflags = F_ISSET(rep, REP_F_MASTER | REP_F_CLIENT); DB_ASSERT(env, cur_repflags); if (FLD_ISSET(cur_repflags, REP_F_MASTER) && - flags == DB_REP_CLIENT) + flags == DB_REP_CLIENT) { ret = __repmgr_become_client(env); - else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) && - flags == DB_REP_MASTER) - ret = __repmgr_become_master(env); + role_change = 1; + } else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) && + flags == DB_REP_MASTER) { + ret = __repmgr_become_master(env, 0); + role_change = 1; + } if (ret != 0) return (ret); @@ -574,6 +768,9 @@ __repmgr_restart(env, nthreads, flags) } __os_free(env, th); } + /* We will always turn on the inqueue full event after role change. */ + if (role_change) + rep->inqueue_full_event_on = 1; out: if (locked) UNLOCK_MUTEX(db_rep->mutex); @@ -668,7 +865,8 @@ __repmgr_start_selector(env) * PUBLIC: int __repmgr_close __P((ENV *)); * * Close repmgr during env close. It stops repmgr, frees sites array and - * its addresses. + * its addresses. Note that it is possible for the sites array to exist + * and require deallocation independently of whether repmgr was started. */ int __repmgr_close(env) @@ -679,10 +877,15 @@ __repmgr_close(env) int ret; u_int i; - db_rep = env->rep_handle; + if ((db_rep = env->rep_handle) == NULL) + return (0); ret = 0; - ret = __repmgr_stop(env); + /* Stop repmgr and all of its threads if it was previously started. */ + if (IS_ENV_REPLICATED(env)) + ret = __repmgr_stop(env); + + /* Clean up sites array regardless of whether we could stop repmgr. */ if (db_rep->sites != NULL) { for (i = 0; i < db_rep->site_cnt; i++) { site = &db_rep->sites[i]; @@ -756,9 +959,9 @@ __repmgr_set_ack_policy(dbenv, policy) DB_ENV *dbenv; int policy; { + ENV *env; DB_REP *db_rep; DB_THREAD_INFO *ip; - ENV *env; REP *rep; int ret; @@ -823,6 +1026,208 @@ __repmgr_get_ack_policy(dbenv, policy) } /* + * PUBLIC: int __repmgr_set_incoming_queue_max __P((DB_ENV *, u_int32_t, + * PUBLIC: u_int32_t)); + * + * Sets the maximum amount of dynamic memory used by the Replication Manager + * incoming queue. + */ +int +__repmgr_set_incoming_queue_max(dbenv, gbytes, bytes) + DB_ENV *dbenv; + u_int32_t gbytes, bytes; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + ENV *env; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + ENV_NOT_CONFIGURED( + env, db_rep->region, "DB_ENV->repmgr_set_incoming_queue_max", + DB_INIT_REP); + + if (APP_IS_BASEAPI(env)) { + __db_errx(env, "%s %s", + "DB_ENV->repmgr_set_incoming_queue_max:", + "cannot call from base replication application"); + return (EINVAL); + } + + /* + * If the caller provided 0 for the size, the size will be unlimited. + */ + if (gbytes == 0 && bytes == 0) { + gbytes = UINT32_MAX; + bytes = GIGABYTE - 1; + } + + while (bytes >= GIGABYTE) { + bytes -= GIGABYTE; + if (gbytes < UINT32_MAX) + gbytes++; + } + + if (REP_ON(env)) { + ENV_ENTER(env, ip); + MUTEX_LOCK(env, rep->mtx_repmgr); + rep->inqueue_max_gbytes = gbytes; + rep->inqueue_max_bytes = bytes; + __repmgr_set_incoming_queue_redzone(rep, gbytes, bytes); + MUTEX_UNLOCK(env, rep->mtx_repmgr); + ENV_LEAVE(env, ip); + } else { + db_rep->inqueue_max_gbytes = gbytes; + db_rep->inqueue_max_bytes = bytes; + } + + /* + * Setting incoming queue maximum sizes makes this a replication + * manager application. + */ + APP_SET_REPMGR(env); + return (0); +} + +/* + * PUBLIC: int __repmgr_get_incoming_queue_max __P((DB_ENV *, u_int32_t *, + * PUBLIC: u_int32_t *)); + * + * Gets the maximum amount of dynamic memory that can be used by the + * Replicaton Manager incoming queue. + */ +int +__repmgr_get_incoming_queue_max(dbenv, gbytesp, bytesp) + DB_ENV *dbenv; + u_int32_t *gbytesp, *bytesp; +{ + ENV *env; + DB_THREAD_INFO *ip; + DB_REP *db_rep; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + if (REP_ON(env)) { + ENV_ENTER(env, ip); + MUTEX_LOCK(env, rep->mtx_repmgr); + *gbytesp = rep->inqueue_max_gbytes; + *bytesp = rep->inqueue_max_bytes; + MUTEX_UNLOCK(env, rep->mtx_repmgr); + ENV_LEAVE(env, ip); + } else { + *gbytesp = db_rep->inqueue_max_gbytes; + *bytesp = db_rep->inqueue_max_bytes; + } + + return (0); +} + +/* + * PUBLIC: void __repmgr_set_incoming_queue_redzone __P((void *, u_int32_t, + * PUBLIC: u_int32_t)); + * + * Sets the lower bound of the repmgr incoming queue red zone. + * !!! Assumes caller holds mtx_repmgr lock. + * + * Note that we can't simply get the REP* address from the env as we usually do, + * because at the time of this call it may not have been linked into there yet. + * Also note that, REP is not a public structure, so we use "void *" here. + */ +void __repmgr_set_incoming_queue_redzone(rep_, gbytes, bytes) + void *rep_; + u_int32_t gbytes, bytes; +{ + REP *rep; + double rdgbytes, rdbytes; + + rep = rep_; + + /* + * We use 'double' values to do the computation for precision, and + * to avoid overflow. + */ + rdgbytes = gbytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00; + rdbytes = (rdgbytes - (u_int32_t)rdgbytes) * GIGABYTE; + rdbytes += bytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00; + if (rdbytes >= GIGABYTE) { + rdgbytes += 1; + rdbytes -= GIGABYTE; + } + rep->inqueue_rz_gbytes = (u_int32_t)rdgbytes; + rep->inqueue_rz_bytes = (u_int32_t)rdbytes; +} + +/* + * PUBLIC: int __repmgr_get_incoming_queue_redzone __P((DB_ENV *, + * PUBLIC: u_int32_t *, u_int32_t *)); + * + * Gets the lower bound of the repmgr incoming queue red zone. + * This method must be called after environment open. + */ +int __repmgr_get_incoming_queue_redzone(dbenv, gbytesp, bytesp) + DB_ENV *dbenv; + u_int32_t *gbytesp, *bytesp; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + ENV *env; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + ENV_REQUIRES_CONFIG( + env, db_rep->region, "__repmgr_get_incoming_queue_redzone", + DB_INIT_REP); + + ENV_ENTER(env, ip); + MUTEX_LOCK(env, rep->mtx_repmgr); + *gbytesp = rep->inqueue_rz_gbytes; + *bytesp = rep->inqueue_rz_bytes; + MUTEX_UNLOCK(env, rep->mtx_repmgr); + ENV_LEAVE(env, ip); + + return (0); +} + +/* + * PUBLIC: int __repmgr_get_incoming_queue_fullevent __P((DB_ENV *, + * PUBLIC: int *)); + * + * Return whether the DB_EVENT_REP_INQUEUE_FULL event firing is + * turned on or off. + * This method must be called after environment open. + */ +int __repmgr_get_incoming_queue_fullevent(dbenv, onoffp) + DB_ENV *dbenv; + int *onoffp; +{ + DB_REP *db_rep; + ENV *env; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + ENV_REQUIRES_CONFIG( + env, db_rep->region, + "DB_ENV->__repmgr_get_incoming_queue_fullevent", + DB_INIT_REP); + + *onoffp = rep->inqueue_full_event_on ? 1 : 0; + + return (0); +} + +/* * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *)); */ int @@ -837,7 +1242,13 @@ __repmgr_env_create(env, db_rep) db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; db_rep->config_nsites = 0; + ADJUST_AUTOTAKEOVER_WAITS(db_rep, DB_REPMGR_DEFAULT_ACK_TIMEOUT); db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; + db_rep->inqueue_max_gbytes = 0; + db_rep->inqueue_max_bytes = 0; +#ifdef HAVE_REPLICATION_LISTENER_TAKEOVER + FLD_SET(db_rep->config, REP_C_AUTOTAKEOVER); +#endif FLD_SET(db_rep->config, REP_C_ELECTIONS); FLD_SET(db_rep->config, REP_C_2SITE_STRICT); @@ -846,7 +1257,8 @@ __repmgr_env_create(env, db_rep) TAILQ_INIT(&db_rep->connections); TAILQ_INIT(&db_rep->retries); - db_rep->input_queue.size = 0; + db_rep->input_queue.gbytes = 0; + db_rep->input_queue.bytes = 0; STAILQ_INIT(&db_rep->input_queue.header); __repmgr_env_create_pf(db_rep); @@ -944,6 +1356,15 @@ __repmgr_await_threads(env) * of a connector thread. */ + /* Takeover thread. */ + if (db_rep->takeover_thread != NULL) { + if ((t_ret = __repmgr_thread_join(db_rep->takeover_thread)) != + 0 && ret == 0) + ret = t_ret; + __os_free(env, db_rep->takeover_thread); + db_rep->takeover_thread = NULL; + } + /* Message processing threads. */ for (i = 0; i < db_rep->nthreads && db_rep->messengers[i] != NULL; i++) { @@ -1178,7 +1599,7 @@ get_shared_netaddr(env, eid, netaddr) MUTEX_LOCK(env, rep->mtx_repmgr); if ((u_int)eid >= rep->site_cnt) { - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } DB_ASSERT(env, rep->siteinfo_off != INVALID_ROFF); @@ -1423,7 +1844,7 @@ send_msg_self(env, iovecs, nmsg) u_int32_t nmsg; { REPMGR_MESSAGE *msg; - size_t align, bodysize, structsize; + size_t align, bodysize, msgsize, structsize; u_int8_t *membase; int ret; @@ -1431,10 +1852,12 @@ send_msg_self(env, iovecs, nmsg) bodysize = iovecs->total_bytes - __REPMGR_MSG_HDR_SIZE; structsize = (size_t)DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) + nmsg * sizeof(DBT)), align); - if ((ret = __os_malloc(env, structsize + bodysize, &membase)) != 0) + msgsize = structsize + bodysize; + if ((ret = __os_malloc(env, msgsize, &membase)) != 0) return (ret); msg = (void*)membase; + msg->size = msgsize; membase += structsize; /* @@ -1616,13 +2039,14 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) } ENV_ENTER(env, ip); - ret = get_channel_connection(channel, &conn); - ENV_LEAVE(env, ip); - if (ret != 0) - return (ret); + if ((ret = get_channel_connection(channel, &conn)) != 0) + goto out; - if (conn == NULL) - return (request_self(env, request, nrequest, response, flags)); + /* If conn is NULL, call request_self and then we are done here. */ + if (conn == NULL) { + ret = request_self(env, request, nrequest, response, flags); + goto out; + } /* Find an available array slot, or grow the array if necessary. */ LOCK_MUTEX(db_rep->mutex); @@ -1670,7 +2094,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) LOCK_MUTEX(db_rep->mutex); F_CLR(&conn->responses[i], RESP_IN_USE | RESP_THREAD_WAITING); UNLOCK_MUTEX(db_rep->mutex); - return (ret); + goto out; } timeout = timeout > 0 ? timeout : db_channel->timeout; @@ -1688,7 +2112,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) * to wake up those threads, with a COMPLETE indication and an * error code. That's more than we want to tackle here. */ - return (ret); + goto out; } /* @@ -1732,7 +2156,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) sz = conn->iovecs.vectors[0].iov_len; if ((ret = __os_malloc(env, sz, &dummy)) != 0) - goto out; + goto out_unlck; __repmgr_iovec_init(&conn->iovecs); DB_INIT_DBT(resp->dbt, dummy, sz); __repmgr_add_dbt(&conn->iovecs, &resp->dbt); @@ -1740,8 +2164,9 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) } } -out: +out_unlck: UNLOCK_MUTEX(db_rep->mutex); +out: ENV_LEAVE(env, ip); return (ret); } @@ -2168,6 +2593,7 @@ __repmgr_channel_close(dbchan, flags) { ENV *env; DB_REP *db_rep; + DB_THREAD_INFO *ip; REPMGR_CONNECTION *conn; CHANNEL *channel; u_int32_t i; @@ -2182,6 +2608,7 @@ __repmgr_channel_close(dbchan, flags) * Disable connection(s) (if not already done due to an error having * occurred previously); release our reference to conn struct(s). */ + ENV_ENTER(env, ip); LOCK_MUTEX(db_rep->mutex); if (dbchan->eid >= 0) { conn = channel->c.conn; @@ -2218,6 +2645,7 @@ __repmgr_channel_close(dbchan, flags) __os_free(env, channel); __os_free(env, dbchan); + ENV_LEAVE(env, ip); return (ret); } @@ -2369,29 +2797,26 @@ join_group_at_site(env, addrp) repmgr_netaddr_t *addrp; { DB_REP *db_rep; + REP *rep; REPMGR_CONNECTION *conn; SITE_STRING_BUFFER addr_buf; repmgr_netaddr_t addr, myaddr; __repmgr_gm_fwd_args fwd; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *p, *response_buf, siteinfo_buf[MAX_MSG_BUF]; char host_buf[MAXHOSTNAMELEN + 1], *host; u_int32_t gen, type; - size_t len; + size_t host_len, msg_len, req_len; int ret, t_ret; db_rep = env->rep_handle; + rep = db_rep->region; LOCK_MUTEX(db_rep->mutex); myaddr = SITE_FROM_EID(db_rep->self_eid)->net_addr; UNLOCK_MUTEX(db_rep->mutex); - len = strlen(myaddr.host) + 1; - DB_INIT_DBT(site_info.host, myaddr.host, len); - site_info.port = myaddr.port; - site_info.flags = 0; - ret = __repmgr_site_info_marshal(env, - &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); - DB_ASSERT(env, ret == 0); + host_len = strlen(myaddr.host) + 1; conn = NULL; response_buf = NULL; @@ -2399,14 +2824,35 @@ join_group_at_site(env, addrp) RPRINT(env, (env, DB_VERB_REPMGR_MISC, "try join request to site %s", __repmgr_format_addr_loc(addrp, addr_buf))); retry: - if ((ret = make_request_conn(env, addrp, &conn)) != 0) + if ((ret = __repmgr_make_request_conn(env, addrp, &conn)) != 0) return (ret); + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + if (conn->version < 5) { + DB_INIT_DBT(v4site_info.host, myaddr.host, host_len); + v4site_info.port = myaddr.port; + v4site_info.flags = 0; + ret = __repmgr_v4site_info_marshal(env, + &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len); + } else { + DB_INIT_DBT(site_info.host, myaddr.host, host_len); + site_info.port = myaddr.port; + site_info.status = 0; + site_info.flags = 0; + if (IS_VIEW_SITE(env)) + FLD_SET(site_info.flags, SITE_VIEW); + if (rep->priority > 0) + FLD_SET(site_info.flags, SITE_JOIN_ELECTABLE); + ret = __repmgr_site_info_marshal(env, + &site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len); + } + DB_ASSERT(env, ret == 0); + /* Preserve separate request length in case there is a retry. */ if ((ret = __repmgr_send_sync_msg(env, conn, - REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0) + REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)req_len)) != 0) goto err; - if ((ret = read_own_msg(env, - conn, &type, &response_buf, &len)) != 0) + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &msg_len)) != 0) goto err; if (type == REPMGR_GM_FAILURE) { @@ -2429,7 +2875,7 @@ retry: goto err; ret = __repmgr_gm_fwd_unmarshal(env, &fwd, - response_buf, len, &p); + response_buf, msg_len, &p); DB_ASSERT(env, ret == 0); if (fwd.gen > gen) { if (fwd.host.size > MAXHOSTNAMELEN + 1) { @@ -2456,7 +2902,8 @@ retry: } } if (type == REPMGR_JOIN_SUCCESS) - ret = __repmgr_refresh_membership(env, response_buf, len); + ret = __repmgr_refresh_membership(env, response_buf, msg_len, + conn->version); else ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */ @@ -2476,129 +2923,6 @@ err: } /* - * Reads a whole message, when we expect to get a REPMGR_OWN_MSG. - */ -static int -read_own_msg(env, conn, typep, bufp, lenp) - ENV *env; - REPMGR_CONNECTION *conn; - u_int32_t *typep; - u_int8_t **bufp; - size_t *lenp; -{ - __repmgr_msg_hdr_args msg_hdr; - u_int8_t *buf; - u_int32_t type; - size_t size; - int ret; - - __repmgr_reset_for_reading(conn); - if ((ret = __repmgr_read_conn(conn)) != 0) - goto err; - ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, - conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); - DB_ASSERT(env, ret == 0); - - if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) { - ret = DB_REP_UNAVAIL; /* Protocol violation. */ - goto err; - } - type = REPMGR_OWN_MSG_TYPE(msg_hdr); - if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) { - conn->reading_phase = DATA_PHASE; - __repmgr_iovec_init(&conn->iovecs); - - if ((ret = __os_malloc(env, size, &buf)) != 0) - goto err; - conn->input.rep_message = NULL; - - __repmgr_add_buffer(&conn->iovecs, buf, size); - if ((ret = __repmgr_read_conn(conn)) != 0) { - __os_free(env, buf); - goto err; - } - *bufp = buf; - } - - *typep = type; - *lenp = size; - -err: - return (ret); -} - -static int -make_request_conn(env, addr, connp) - ENV *env; - repmgr_netaddr_t *addr; - REPMGR_CONNECTION **connp; -{ - DBT vi; - __repmgr_msg_hdr_args msg_hdr; - __repmgr_version_confirmation_args conf; - REPMGR_CONNECTION *conn; - int alloc, ret, unused; - - alloc = FALSE; - if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0) - return (ret); - conn->type = APP_CONNECTION; - - /* Read a handshake msg, to get version confirmation and parameters. */ - if ((ret = __repmgr_read_conn(conn)) != 0) - goto err; - /* - * We can only get here after having read the full 9 bytes that we - * expect, so this can't fail. - */ - DB_ASSERT(env, conn->reading_phase == SIZES_PHASE); - ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, - conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); - DB_ASSERT(env, ret == 0); - __repmgr_iovec_init(&conn->iovecs); - conn->reading_phase = DATA_PHASE; - - if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0) - goto err; - alloc = TRUE; - - if ((ret = __repmgr_read_conn(conn)) != 0) - goto err; - - /* - * Analyze the handshake msg, and stash relevant info. - */ - if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0) - goto err; - DB_ASSERT(env, vi.size > 0); - if ((ret = __repmgr_version_confirmation_unmarshal(env, - &conf, vi.data, vi.size, NULL)) != 0) - goto err; - - if (conf.version < GM_MIN_VERSION) { - ret = DB_REP_UNAVAIL; - goto err; - } - conn->version = conf.version; - -err: - if (alloc) { - DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0); - __os_free(env, conn->input.repmgr_msg.cntrl.data); - DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0); - __os_free(env, conn->input.repmgr_msg.rec.data); - } - __repmgr_reset_for_reading(conn); - if (ret == 0) - *connp = conn; - else { - (void)__repmgr_close_connection(env, conn); - (void)__repmgr_destroy_conn(env, conn); - } - return (ret); -} - -/* * PUBLIC: int __repmgr_site __P((DB_ENV *, * PUBLIC: const char *, u_int, DB_SITE **, u_int32_t)); */ @@ -2640,9 +2964,9 @@ site_by_addr(env, host, port, sitep) if ((ret = addr_chk(env, host, port)) != 0) return (ret); + ENV_ENTER(env, ip); if (REP_ON(env)) { LOCK_MUTEX(db_rep->mutex); - ENV_ENTER(env, ip); locked = TRUE; } else locked = FALSE; @@ -2654,10 +2978,9 @@ site_by_addr(env, host, port, sitep) * we want the DB_SITE handle to point to; just like site_by_eid() does. */ host = site->net_addr.host; - if (locked) { - ENV_LEAVE(env, ip); + if (locked) UNLOCK_MUTEX(db_rep->mutex); - } + ENV_LEAVE(env, ip); if (ret != 0) return (ret); @@ -2723,7 +3046,7 @@ init_dbsite(env, eid, host, port, sitep) dbsite->get_address = __repmgr_get_site_address; dbsite->get_config = __repmgr_get_config; dbsite->get_eid = __repmgr_get_eid; - dbsite->set_config = __repmgr_site_config; + dbsite->set_config = __repmgr_site_config_pp; dbsite->remove = __repmgr_remove_site_pp; dbsite->close = __repmgr_site_close; @@ -2756,9 +3079,16 @@ __repmgr_get_eid(dbsite, eidp) DB_SITE *dbsite; int *eidp; { + DB_THREAD_INFO *ip; + ENV *env; int ret; - if ((ret = refresh_site(dbsite)) != 0) + env = dbsite->env; + + ENV_ENTER(env, ip); + ret = refresh_site(dbsite); + ENV_LEAVE(env, ip); + if (ret != 0) return (ret); if (F_ISSET(dbsite, DB_SITE_PREOPEN)) { @@ -2791,8 +3121,11 @@ __repmgr_get_config(dbsite, which, valuep) env = dbsite->env; db_rep = env->rep_handle; - if ((ret = refresh_site(dbsite)) != 0) + ENV_ENTER(env, ip); + if ((ret = refresh_site(dbsite)) != 0) { + ENV_LEAVE(env, ip); return (ret); + } LOCK_MUTEX(db_rep->mutex); DB_ASSERT(env, IS_VALID_EID(dbsite->eid)); site = SITE_FROM_EID(dbsite->eid); @@ -2800,32 +3133,52 @@ __repmgr_get_config(dbsite, which, valuep) rep = db_rep->region; infop = env->reginfo; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); sites = R_ADDR(infop, rep->siteinfo_off); site->config = sites[dbsite->eid].config; MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); } *valuep = FLD_ISSET(site->config, which) ? 1 : 0; UNLOCK_MUTEX(db_rep->mutex); + ENV_LEAVE(env, ip); return (0); } /* - * PUBLIC: int __repmgr_site_config __P((DB_SITE *, u_int32_t, u_int32_t)); + * PUBLIC: int __repmgr_site_config_pp __P((DB_SITE *, u_int32_t, u_int32_t)); */ int -__repmgr_site_config(dbsite, which, value) +__repmgr_site_config_pp(dbsite, which, value) DB_SITE *dbsite; u_int32_t which; u_int32_t value; { - DB_REP *db_rep; DB_THREAD_INFO *ip; ENV *env; + int ret; + + env = dbsite->env; + + ENV_ENTER(env, ip); + ret = __repmgr_site_config_int(dbsite, which, value); + ENV_LEAVE(env, ip); + + return (ret); +} + +/* + * PUBLIC: int __repmgr_site_config_int __P((DB_SITE *, u_int32_t, u_int32_t)); + */ +int +__repmgr_site_config_int(dbsite, which, value) + DB_SITE *dbsite; + u_int32_t which; + u_int32_t value; +{ + DB_REP *db_rep; + ENV *env; REGINFO *infop; REP *rep; REPMGR_SITE *site; @@ -2875,7 +3228,6 @@ __repmgr_site_config(dbsite, which, value) infop = env->reginfo; LOCK_MUTEX(db_rep->mutex); - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); sites = R_ADDR(infop, rep->siteinfo_off); site = SITE_FROM_EID(dbsite->eid); @@ -2896,7 +3248,6 @@ __repmgr_site_config(dbsite, which, value) rep->siteinfo_seq++; } MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); UNLOCK_MUTEX(db_rep->mutex); } else { site = SITE_FROM_EID(dbsite->eid); @@ -2930,7 +3281,6 @@ set_local_site(dbsite, value) if (REP_ON(env)) { rep = db_rep->region; LOCK_MUTEX(db_rep->mutex); - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); locked = TRUE; /* Make sure we're in sync first. */ @@ -2941,31 +3291,32 @@ set_local_site(dbsite, value) __db_errx(env, DB_STR("3666", "A previously given local site may not be unset")); ret = EINVAL; - } else if (IS_VALID_EID(db_rep->self_eid) && - db_rep->self_eid != dbsite->eid) { - __db_errx(env, DB_STR("3667", - "A (different) local site has already been set")); - ret = EINVAL; - } else { - DB_ASSERT(env, IS_VALID_EID(dbsite->eid)); - site = SITE_FROM_EID(dbsite->eid); - if (FLD_ISSET(site->config, - DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) { - __db_errx(env, DB_STR("3668", - "Local site cannot have HELPER or PEER attributes")); + } else if (value) { + if (IS_VALID_EID(db_rep->self_eid) && + db_rep->self_eid != dbsite->eid) { + __db_errx(env, DB_STR("3697", + "A (different) local site has already been set")); ret = EINVAL; + } else { + DB_ASSERT(env, IS_VALID_EID(dbsite->eid)); + site = SITE_FROM_EID(dbsite->eid); + if (FLD_ISSET(site->config, + DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) { + __db_errx(env, DB_STR("3698", + "Local site cannot have HELPER or PEER attributes")); + ret = EINVAL; + } } } - if (ret == 0) { + if (ret == 0 && value) { db_rep->self_eid = dbsite->eid; if (locked) { - rep->self_eid = dbsite->eid; + rep->self_eid = db_rep->self_eid; rep->siteinfo_seq++; } } if (locked) { MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); UNLOCK_MUTEX(db_rep->mutex); } return (ret); @@ -2998,7 +3349,7 @@ refresh_site(dbsite) } static int -__repmgr_remove_site_pp(dbsite) +__repmgr_remove_and_close_site(dbsite) DB_SITE *dbsite; { int ret, t_ret; @@ -3011,6 +3362,23 @@ __repmgr_remove_site_pp(dbsite) */ if ((t_ret = __repmgr_site_close(dbsite)) != 0 && ret == 0) ret = t_ret; + + return (ret); +} + +static int +__repmgr_remove_site_pp(dbsite) + DB_SITE *dbsite; +{ + ENV *env; + DB_THREAD_INFO *ip; + int ret; + + env = dbsite->env; + + ENV_ENTER(env, ip); + ret = __repmgr_remove_and_close_site(dbsite); + ENV_LEAVE(env, ip); return (ret); } @@ -3024,6 +3392,7 @@ __repmgr_remove_site(dbsite) REPMGR_CONNECTION *conn; repmgr_netaddr_t addr; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *response_buf, siteinfo_buf[MAX_MSG_BUF]; size_t len; u_int32_t type; @@ -3046,23 +3415,33 @@ __repmgr_remove_site(dbsite) DB_ASSERT(env, IS_VALID_EID(master)); addr = SITE_FROM_EID(master)->net_addr; UNLOCK_MUTEX(db_rep->mutex); - len = strlen(dbsite->host) + 1; - DB_INIT_DBT(site_info.host, dbsite->host, len); - site_info.port = dbsite->port; - site_info.flags = 0; - ret = __repmgr_site_info_marshal(env, - &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); - DB_ASSERT(env, ret == 0); conn = NULL; response_buf = NULL; - if ((ret = make_request_conn(env, &addr, &conn)) != 0) + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) return (ret); + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + if (conn->version < 5) { + DB_INIT_DBT(v4site_info.host, dbsite->host, len); + v4site_info.port = dbsite->port; + v4site_info.flags = 0; + ret = __repmgr_v4site_info_marshal(env, + &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); + } else { + DB_INIT_DBT(site_info.host, dbsite->host, len); + site_info.port = dbsite->port; + site_info.status = 0; + site_info.flags = 0; + ret = __repmgr_site_info_marshal(env, + &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); + } + DB_ASSERT(env, ret == 0); + if ((ret = __repmgr_send_sync_msg(env, conn, REPMGR_REMOVE_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0) goto err; - if ((ret = read_own_msg(env, + if ((ret = __repmgr_read_own_msg(env, conn, &type, &response_buf, &len)) != 0) goto err; ret = type == REPMGR_REMOVE_SUCCESS ? 0 : DB_REP_UNAVAIL; @@ -3090,3 +3469,82 @@ __repmgr_site_close(dbsite) __os_free(dbsite->env, dbsite); return (0); } + +/* + * Demotes a participant site to a view. This is a one-way and one-time + * operation. + * + * The demotion occurs at the very end of repmgr_start() because it + * requires a select thread to perform the gmdb operations that remove + * the site from the replication group and immediately add the site back + * into the group as a view. The demotion also preserves any other threads + * created by repmgr_start() so that they are there to be used by the + * demoted site after it is re-added as a view site. + * + * We remove and re-add the site to propagate the site's change from + * participant to view to all sites in the replication group. This includes + * updates to each site's gmdb and in-memory site list. + */ +#define REPMGR_DEMOTION_MASTER_RETRIES 10 +#define REPMGR_DEMOTION_RETRY_USECS 500000 +static int +__repmgr_demote_site(env, eid) + ENV *env; + int eid; +{ + DB_REP *db_rep; + DB_SITE *dbsite; + REP *rep; + REPMGR_SITE *site; + int ret, t_ret, tries; + + db_rep = env->rep_handle; + rep = db_rep->region; + site = SITE_FROM_EID(eid); + dbsite = NULL; + + /* Inform other repmgr threads that a demotion is in progress. */ + db_rep->demotion_pending = TRUE; + + if ((ret = init_dbsite(env, eid, site->net_addr.host, + site->net_addr.port, &dbsite)) != 0) + goto err; + + /* + * We need a master to perform the gmdb updates. Poll periodically + * for a limited time to find one. + */ + tries = 0; + while (rep->master_id == DB_EID_INVALID) { + __os_yield(env, 0, REPMGR_DEMOTION_RETRY_USECS); + if (++tries >= REPMGR_DEMOTION_MASTER_RETRIES) { + ret = DB_REP_UNAVAIL; + goto err; + } + } + + /* Remove site from replication group. */ + if ((ret = __repmgr_remove_site(dbsite)) != 0) + goto err; + + /* + * Add site back into replication group as a view. This demotion is + * occurring because this site now has a view callback but its + * SITE_VIEW flag is not set. Now, __repmgr_join_group() will detect + * the view callback and set the SITE_VIEW flag before sending this + * site's information to the rest of the replication group. + */ + if ((ret = __repmgr_join_group(env)) != 0) + goto err; + +err: + /* Deallocates dbsite. */ + if (dbsite != NULL) { + t_ret = __repmgr_site_close(dbsite); + if (ret == 0 && t_ret != 0) + ret = t_ret; + } + /* Must reset demotion_pending before leaving this routine. */ + db_rep->demotion_pending = FALSE; + return (ret); +} |
