summaryrefslogtreecommitdiff
path: root/src/repmgr/repmgr_util.c
diff options
context:
space:
mode:
authorLorry Tar Creator <lorry-tar-importer@baserock.org>2015-02-17 17:25:57 +0000
committer <>2015-03-17 16:26:24 +0000
commit780b92ada9afcf1d58085a83a0b9e6bc982203d1 (patch)
tree598f8b9fa431b228d29897e798de4ac0c1d3d970 /src/repmgr/repmgr_util.c
parent7a2660ba9cc2dc03a69ddfcfd95369395cc87444 (diff)
downloadberkeleydb-master.tar.gz
Imported from /home/lorry/working-area/delta_berkeleydb/db-6.1.23.tar.gz.HEADdb-6.1.23master
Diffstat (limited to 'src/repmgr/repmgr_util.c')
-rw-r--r--src/repmgr/repmgr_util.c957
1 files changed, 904 insertions, 53 deletions
diff --git a/src/repmgr/repmgr_util.c b/src/repmgr/repmgr_util.c
index c2439436..1c5ebe59 100644
--- a/src/repmgr/repmgr_util.c
+++ b/src/repmgr/repmgr_util.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -15,9 +15,13 @@
#define INITIAL_SITES_ALLOCATION 3 /* Arbitrary guess. */
+static int convert_gmdb(ENV *, DB_THREAD_INFO *, DB *, DB_TXN *);
static int get_eid __P((ENV *, const char *, u_int, int *));
-static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *));
+static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
+static int __repmgr_find_commit __P((ENV *, DB_LSN *, DB_LSN *, int *));
+static int __repmgr_remote_lsnhist(ENV *, int, u_int32_t,
+ __repmgr_lsnhist_match_args *);
/*
* Schedules a future attempt to re-establish a connection with the given site.
@@ -43,6 +47,8 @@ __repmgr_schedule_connection_attempt(env, eid, immediate)
REP *rep;
REPMGR_RETRY *retry, *target;
REPMGR_SITE *site;
+ SITEINFO *sites;
+ db_timeout_t timeout;
db_timespec t;
int ret;
@@ -57,7 +63,24 @@ __repmgr_schedule_connection_attempt(env, eid, immediate)
if (immediate)
TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
else {
- TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait);
+ /*
+ * Normally we retry a connection after connection retry
+ * timeout. In a subordinate rep-aware process, we retry sooner
+ * when there is a listener candidate on the disconnected site.
+ * The listener process will be connected from the new listener,
+ * but subordinate rep-aware process can only wait for retry.
+ * It matters when the subordinate process becomes listener and
+ * the disconnected site is master. The m_listener_wait is set
+ * to retry after enough time has passed for a takeover. The
+ * number of listener candidates is maintained in the listener
+ * process as it has connections to all subordinate processes
+ * from other sites.
+ */
+ timeout = rep->connection_retry_wait;
+ CHECK_LISTENER_CAND(timeout, >0, db_rep->m_listener_wait,
+ timeout);
+ TIMESPEC_ADD_DB_TIMEOUT(&t, timeout);
+
/*
* Insert the new "retry" on the (time-ordered) list in its
* proper position. To do so, find the list entry ("target")
@@ -284,6 +307,7 @@ __repmgr_new_site(env, sitep, host, port)
site->net_addr.host = p;
site->net_addr.port = (u_int16_t)port;
+ site->max_ack_gen = 0;
ZERO_LSN(site->max_ack);
site->ack_policy = 0;
site->alignment = 0;
@@ -295,6 +319,7 @@ __repmgr_new_site(env, sitep, host, port)
site->state = SITE_IDLE;
site->membership = 0;
+ site->gmdb_flags = 0;
site->config = 0;
*sitep = site;
@@ -535,11 +560,14 @@ __repmgr_thread_failure(env, why)
int why;
{
DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
db_rep = env->rep_handle;
+ ENV_ENTER(env, ip);
LOCK_MUTEX(db_rep->mutex);
(void)__repmgr_stop_threads(env);
UNLOCK_MUTEX(db_rep->mutex);
+ ENV_LEAVE(env, ip);
return (__env_panic(env, why));
}
@@ -597,12 +625,13 @@ __repmgr_format_addr_loc(addr, buffer)
}
/*
- * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t));
+ * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t, u_int32_t));
*/
int
-__repmgr_repstart(env, flags)
+__repmgr_repstart(env, flags, startopts)
ENV *env;
u_int32_t flags;
+ u_int32_t startopts;
{
DBT my_addr;
int ret;
@@ -610,7 +639,11 @@ __repmgr_repstart(env, flags)
/* Include "cdata" in case sending to old-version site. */
if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
return (ret);
- ret = __rep_start_int(env, &my_addr, flags);
+ /*
+ * force_role_chg and hold_client_gen are used by preferred master
+ * mode to help control site startup.
+ */
+ ret = __rep_start_int(env, &my_addr, flags, startopts);
__os_free(env, my_addr.data);
if (ret != 0)
__db_err(env, ret, DB_STR("3673", "rep_start"));
@@ -618,11 +651,12 @@ __repmgr_repstart(env, flags)
}
/*
- * PUBLIC: int __repmgr_become_master __P((ENV *));
+ * PUBLIC: int __repmgr_become_master __P((ENV *, u_int32_t));
*/
int
-__repmgr_become_master(env)
+__repmgr_become_master(env, startopts)
ENV *env;
+ u_int32_t startopts;
{
DB_REP *db_rep;
DB_THREAD_INFO *ip;
@@ -631,7 +665,7 @@ __repmgr_become_master(env)
REPMGR_SITE *site;
DBT key_dbt, data_dbt;
__repmgr_membership_key_args key;
- __repmgr_membership_data_args member_status;
+ __repmgr_membership_data_args member_data;
repmgr_netaddr_t addr;
u_int32_t status;
u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
@@ -668,16 +702,23 @@ __repmgr_become_master(env)
db_rep->client_intent = FALSE;
UNLOCK_MUTEX(db_rep->mutex);
- if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0)
+ if ((ret = __repmgr_repstart(env, DB_REP_MASTER, startopts)) != 0)
return (ret);
+ /*
+ * Make sure member_version_gen is current so that this master
+ * can reject obsolete member lists from other sites.
+ */
+ db_rep->member_version_gen = db_rep->region->gen;
+
+ /* If there is already a gmdb, we are finished. */
if (db_rep->have_gmdb)
return (0);
- db_rep->member_version_gen = db_rep->region->gen;
- ENV_ENTER(env, ip);
+ /* There isn't a gmdb. Create one from the in-memory site list. */
if ((ret = __repmgr_hold_master_role(env, NULL)) != 0)
goto leave;
+ ENV_GET_THREAD_INFO(env, ip);
retry:
if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0)
goto err;
@@ -705,8 +746,9 @@ retry:
&key, key_buf, sizeof(key_buf), &len);
DB_ASSERT(env, ret == 0);
DB_INIT_DBT(key_dbt, key_buf, len);
- member_status.flags = status;
- __repmgr_membership_data_marshal(env, &member_status, data_buf);
+ member_data.status = status;
+ member_data.flags = site->gmdb_flags;
+ __repmgr_membership_data_marshal(env, &member_data, data_buf);
DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0)
goto err;
@@ -726,7 +768,6 @@ err:
if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0)
ret = t_ret;
leave:
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -840,6 +881,14 @@ __repmgr_open(env, rep_)
rep->election_retry_wait = db_rep->election_retry_wait;
rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout;
rep->heartbeat_frequency = db_rep->heartbeat_frequency;
+ rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes;
+ rep->inqueue_max_bytes = db_rep->inqueue_max_bytes;
+ if (rep->inqueue_max_gbytes == 0 && rep->inqueue_max_bytes == 0) {
+ rep->inqueue_max_bytes = DB_REPMGR_DEFAULT_INQUEUE_MAX;
+ }
+ __repmgr_set_incoming_queue_redzone(rep, rep->inqueue_max_gbytes,
+ rep->inqueue_max_bytes);
+
return (ret);
}
@@ -958,6 +1007,18 @@ __repmgr_join(env, rep_)
}
db_rep->siteinfo_seq = rep->siteinfo_seq;
+ /*
+ * Update the incoming queue limit settings if necessary.
+ */
+ if ((db_rep->inqueue_max_gbytes != 0 ||
+ db_rep->inqueue_max_bytes != 0) &&
+ (db_rep->inqueue_max_gbytes != rep->inqueue_max_gbytes ||
+ db_rep->inqueue_max_bytes != rep->inqueue_max_gbytes)) {
+ rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes;
+ rep->inqueue_max_bytes = db_rep->inqueue_max_bytes;
+ __repmgr_set_incoming_queue_redzone(rep,
+ rep->inqueue_max_gbytes, rep->inqueue_max_bytes);
+ }
unlock:
MUTEX_UNLOCK(env, rep->mtx_repmgr);
return (ret);
@@ -1073,6 +1134,7 @@ __repmgr_share_netaddrs(env, rep_, start, limit)
shared_array[eid].addr.port = db_rep->sites[i].net_addr.port;
shared_array[eid].config = db_rep->sites[i].config;
shared_array[eid].status = db_rep->sites[i].membership;
+ shared_array[eid].flags = db_rep->sites[i].gmdb_flags;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"EID %d is assigned for site %s:%lu",
eid, host, (u_long)shared_array[eid].addr.port));
@@ -1134,6 +1196,7 @@ __repmgr_copy_in_added_sites(env)
site = SITE_FROM_EID(i);
site->config = p->config;
site->membership = p->status;
+ site->gmdb_flags = p->flags;
}
out:
@@ -1266,7 +1329,9 @@ __repmgr_stable_lsn(env, stable_lsn)
db_rep = env->rep_handle;
rep = db_rep->region;
- if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) {
+ LOCK_MUTEX(db_rep->mutex);
+ if (rep->sites_avail != 0 && rep->min_log_file != 0 &&
+ rep->min_log_file < stable_lsn->file) {
/*
* Returning an LSN to be consistent with the rest of the
* log archiving processing. Construct LSN of format
@@ -1276,12 +1341,91 @@ __repmgr_stable_lsn(env, stable_lsn)
stable_lsn->offset = 0;
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]",
- (u_long)stable_lsn->file, (u_long)stable_lsn->offset));
+"Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu] sites_avail %lu min_log %lu",
+ (u_long)stable_lsn->file, (u_long)stable_lsn->offset,
+ (u_long)rep->sites_avail, (u_long)rep->min_log_file));
+ UNLOCK_MUTEX(db_rep->mutex);
return (0);
}
/*
+ * PUBLIC: int __repmgr_make_request_conn __P((ENV *,
+ * PUBLIC: repmgr_netaddr_t *, REPMGR_CONNECTION **));
+ */
+int
+__repmgr_make_request_conn(env, addr, connp)
+ ENV *env;
+ repmgr_netaddr_t *addr;
+ REPMGR_CONNECTION **connp;
+{
+ DBT vi;
+ __repmgr_msg_hdr_args msg_hdr;
+ __repmgr_version_confirmation_args conf;
+ REPMGR_CONNECTION *conn;
+ int alloc, ret, unused;
+
+ alloc = FALSE;
+ if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0)
+ return (ret);
+ conn->type = APP_CONNECTION;
+
+ /* Read a handshake msg, to get version confirmation and parameters. */
+ if ((ret = __repmgr_read_conn(conn)) != 0)
+ goto err;
+ /*
+ * We can only get here after having read the full 9 bytes that we
+ * expect, so this can't fail.
+ */
+ DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
+ ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
+ conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
+ DB_ASSERT(env, ret == 0);
+ __repmgr_iovec_init(&conn->iovecs);
+ conn->reading_phase = DATA_PHASE;
+
+ if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0)
+ goto err;
+ alloc = TRUE;
+
+ if ((ret = __repmgr_read_conn(conn)) != 0)
+ goto err;
+
+ /*
+ * Analyze the handshake msg, and stash relevant info.
+ */
+ if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
+ goto err;
+ DB_ASSERT(env, vi.size > 0);
+ if ((ret = __repmgr_version_confirmation_unmarshal(env,
+ &conf, vi.data, vi.size, NULL)) != 0)
+ goto err;
+
+ if (conf.version < GM_MIN_VERSION ||
+ (IS_VIEW_SITE(env) && conf.version < VIEW_MIN_VERSION) ||
+ (PREFMAS_IS_SET(env) && conf.version < PREFMAS_MIN_VERSION)) {
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ }
+ conn->version = conf.version;
+
+err:
+ if (alloc) {
+ DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0);
+ __os_free(env, conn->input.repmgr_msg.cntrl.data);
+ DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0);
+ __os_free(env, conn->input.repmgr_msg.rec.data);
+ }
+ __repmgr_reset_for_reading(conn);
+ if (ret == 0)
+ *connp = conn;
+ else {
+ (void)__repmgr_close_connection(env, conn);
+ (void)__repmgr_destroy_conn(env, conn);
+ }
+ return (ret);
+}
+
+/*
* PUBLIC: int __repmgr_send_sync_msg __P((ENV *, REPMGR_CONNECTION *,
* PUBLIC: u_int32_t, u_int8_t *, u_int32_t));
*/
@@ -1311,15 +1455,511 @@ __repmgr_send_sync_msg(env, conn, type, buf, len)
}
/*
+ * Reads a whole message, when we expect to get a REPMGR_OWN_MSG.
+ */
+/*
+ * PUBLIC: int __repmgr_read_own_msg __P((ENV *, REPMGR_CONNECTION *,
+ * PUBLIC: u_int32_t *, u_int8_t **, size_t *));
+ */
+int
+__repmgr_read_own_msg(env, conn, typep, bufp, lenp)
+ ENV *env;
+ REPMGR_CONNECTION *conn;
+ u_int32_t *typep;
+ u_int8_t **bufp;
+ size_t *lenp;
+{
+ __repmgr_msg_hdr_args msg_hdr;
+ u_int8_t *buf;
+ u_int32_t type;
+ size_t size;
+ int ret;
+
+ __repmgr_reset_for_reading(conn);
+ if ((ret = __repmgr_read_conn(conn)) != 0)
+ goto err;
+ ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
+ conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
+ DB_ASSERT(env, ret == 0);
+
+ if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) {
+ ret = DB_REP_UNAVAIL; /* Protocol violation. */
+ goto err;
+ }
+ type = REPMGR_OWN_MSG_TYPE(msg_hdr);
+ if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) {
+ conn->reading_phase = DATA_PHASE;
+ __repmgr_iovec_init(&conn->iovecs);
+
+ if ((ret = __os_malloc(env, size, &buf)) != 0)
+ goto err;
+ conn->input.rep_message = NULL;
+
+ __repmgr_add_buffer(&conn->iovecs, buf, size);
+ if ((ret = __repmgr_read_conn(conn)) != 0) {
+ __os_free(env, buf);
+ goto err;
+ }
+ *bufp = buf;
+ }
+
+ *typep = type;
+ *lenp = size;
+
+err:
+ return (ret);
+}
+
+/*
+ * Returns TRUE if we are connected to the other site in a preferred
+ * master replication group, FALSE otherwise.
+ *
+ * PUBLIC: int __repmgr_prefmas_connected __P((ENV *));
+ */
+int
+__repmgr_prefmas_connected(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ REPMGR_SITE *other_site;
+
+ db_rep = env->rep_handle;
+
+ /*
+ * Preferred master mode only has 2 sites, so the other site is
+ * always EID 1.
+ */
+ if (!IS_PREFMAS_MODE(env) || !IS_KNOWN_REMOTE_SITE(1))
+ return (FALSE);
+
+ other_site = SITE_FROM_EID(1);
+ if (other_site->state == SITE_CONNECTED)
+ return (TRUE);
+
+ if ((conn = other_site->ref.conn.in) != NULL &&
+ IS_READY_STATE(conn->state))
+ return (TRUE);
+ if ((conn = other_site->ref.conn.out) != NULL &&
+ IS_READY_STATE(conn->state))
+ return (TRUE);
+
+ return (FALSE);
+}
+
+/*
+ * Used by a preferred master site to restart the remote temporary master
+ * site as a client. This is used to help guarantee that the preferred master
+ * site's transactions are never rolled back.
+ *
+ * PUBLIC: int __repmgr_restart_site_as_client __P((ENV *, int));
+ */
+int
+__repmgr_restart_site_as_client(env, eid)
+ ENV *env;
+ int eid;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ repmgr_netaddr_t addr;
+ u_int32_t type;
+ size_t len;
+ u_int8_t any_value, *response_buf;
+ int ret, t_ret;
+
+ COMPQUIET(any_value, 0);
+ db_rep = env->rep_handle;
+ conn = NULL;
+
+ if (!IS_PREFMAS_MODE(env))
+ return (0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ addr = SITE_FROM_EID(eid)->net_addr;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
+ return (ret);
+
+ /*
+ * No payload needed, but must send at least a dummy byte for the
+ * other side to recognize that a message has arrived.
+ */
+ if ((ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_RESTART_CLIENT, VOID_STAR_CAST &any_value, 1)) != 0)
+ goto err;
+
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &len)) != 0)
+ goto err;
+ if (type != REPMGR_PREFMAS_SUCCESS) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "restart_site_as_client got unexpected message type %d",
+ type));
+ ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */
+ }
+err:
+ if (conn != NULL) {
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ if ((t_ret = __repmgr_destroy_conn(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ }
+ return (ret);
+}
+
+/*
+ * Used by a preferred master site to make the remote temporary master
+ * site a readonly master. This is used to help preserve all temporary
+ * master transactions.
+ *
+ * PUBLIC: int __repmgr_make_site_readonly_master __P((ENV *, int,
+ * PUBLIC: u_int32_t *, DB_LSN *));
+ */
+int
+__repmgr_make_site_readonly_master(env, eid, gen, sync_lsnp)
+ ENV *env;
+ int eid;
+ u_int32_t *gen;
+ DB_LSN *sync_lsnp;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ repmgr_netaddr_t addr;
+ __repmgr_permlsn_args permlsn;
+ u_int32_t type;
+ size_t len;
+ u_int8_t any_value, *response_buf;
+ int ret, t_ret;
+
+ COMPQUIET(any_value, 0);
+ db_rep = env->rep_handle;
+ conn = NULL;
+ response_buf = NULL;
+ *gen = 0;
+ ZERO_LSN(*sync_lsnp);
+
+ if (!IS_PREFMAS_MODE(env))
+ return (0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ addr = SITE_FROM_EID(eid)->net_addr;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
+ return (ret);
+
+ /*
+ * No payload needed, but must send at least a dummy byte for the
+ * other side to recognize that a message has arrived.
+ */
+ if ((ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_READONLY_MASTER, VOID_STAR_CAST &any_value, 1)) != 0)
+ goto err;
+
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &len)) != 0)
+ goto err;
+
+ if (type == REPMGR_READONLY_RESPONSE) {
+ if ((ret = __repmgr_permlsn_unmarshal(env,
+ &permlsn, response_buf, len, NULL)) != 0)
+ goto err;
+ *gen = permlsn.generation;
+ *sync_lsnp = permlsn.lsn;
+ } else {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "make_site_readonly_master got unexpected message type %d",
+ type));
+ ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */
+ }
+
+err:
+ if (conn != NULL) {
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ if ((t_ret = __repmgr_destroy_conn(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ }
+ if (response_buf != NULL)
+ __os_free(env, response_buf);
+ return (ret);
+}
+
+/*
+ * Used by a preferred master site to perform the LSN history comparisons to
+ * determine whether there is are continuous or conflicting sets of
+ * transactions between this site and the remote temporary master.
+ *
+ * PUBLIC: int __repmgr_lsnhist_match __P((ENV *,
+ * PUBLIC: DB_THREAD_INFO *, int, int *));
+ */
+int
+__repmgr_lsnhist_match(env, ip, eid, match)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ int eid;
+ int *match;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ __rep_lsn_hist_data_args my_lsnhist;
+ __repmgr_lsnhist_match_args remote_lsnhist;
+ u_int32_t my_gen;
+ int found_commit, ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ *match = FALSE;
+ my_gen = rep->gen;
+ found_commit = FALSE;
+
+ if (!IS_PREFMAS_MODE(env))
+ return (0);
+
+ /* Get local LSN history information for comparison. */
+ if ((ret = __rep_get_lsnhist_data(env, ip, my_gen, &my_lsnhist)) != 0)
+ return (ret);
+
+ /* Get remote LSN history information for comparison. */
+ ret = __repmgr_remote_lsnhist(env, eid, my_gen, &remote_lsnhist);
+
+ /*
+ * If the current gen doesn't exist at the remote site, the match
+ * fails.
+ *
+ * If the remote LSN or timestamp at the current gen doesn't match
+ * ours, we probably had a whack-a-mole situation where each site
+ * as up and down in isolation one or more times and the match fails.
+ *
+ * If the remote LSN for the next generation is lower than this
+ * site's startup LSN and there are any commit operations between
+ * these LSNs, there are conflicting sets of transactions and the
+ * match fails.
+ */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match my_lsn [%lu][%lu] remote_lsn [%lu][%lu]",
+ (u_long)my_lsnhist.lsn.file, (u_long)my_lsnhist.lsn.offset,
+ (u_long)remote_lsnhist.lsn.file,
+ (u_long)remote_lsnhist.lsn.offset));
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match my_time %lu:%lu remote_time %lu:%lu",
+ (u_long)my_lsnhist.hist_sec, (u_long)my_lsnhist.hist_nsec,
+ (u_long)remote_lsnhist.hist_sec, (u_long)remote_lsnhist.hist_nsec));
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match pminit_lsn [%lu][%lu] next_gen_lsn [%lu][%lu]",
+ (u_long)db_rep->prefmas_init_lsn.file,
+ (u_long)db_rep->prefmas_init_lsn.offset,
+ (u_long)remote_lsnhist.next_gen_lsn.file,
+ (u_long)remote_lsnhist.next_gen_lsn.offset));
+ if (ret != DB_REP_UNAVAIL &&
+ LOG_COMPARE(&my_lsnhist.lsn, &remote_lsnhist.lsn) == 0 &&
+ my_lsnhist.hist_sec == remote_lsnhist.hist_sec &&
+ my_lsnhist.hist_nsec == remote_lsnhist.hist_nsec) {
+ /*
+ * If the remote site doesn't yet have the next gen or if
+ * our startup LSN is <= than the remote next gen LSN, we
+ * have a match.
+ *
+ * Otherwise, our startup LSN is higher than the remote
+ * next gen LSN. If we have any commit operations between
+ * these two LSNs, we have preferred master operations we
+ * must preserve and there is not a match. But if we just
+ * have uncommitted operations between these LSNs it doesn't
+ * matter if they are rolled back, so we call it a match and
+ * try to retain temporary master transactions if possible.
+ */
+ if (IS_ZERO_LSN(remote_lsnhist.next_gen_lsn) ||
+ LOG_COMPARE(&db_rep->prefmas_init_lsn,
+ &remote_lsnhist.next_gen_lsn) <= 0)
+ *match = TRUE;
+ else if ((ret = __repmgr_find_commit(env,
+ &remote_lsnhist.next_gen_lsn,
+ &db_rep->prefmas_init_lsn, &found_commit)) == 0 &&
+ !found_commit) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match !found_commit set match TRUE"));
+ *match = TRUE;
+ }
+ }
+
+ /* Don't return an error if current gen didn't exist at remote site. */
+ if (ret == DB_REP_UNAVAIL)
+ ret = 0;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match match %d returning %d", *match, ret));
+ return (ret);
+}
+
+/*
+ * Checks a range of log records from low_lsn to high_lsn for any
+ * commit operations. Sets found_commit to TRUE if a commit is
+ * found.
+ */
+static int
+__repmgr_find_commit(env, low_lsn, high_lsn, found_commit)
+ ENV *env;
+ DB_LSN *low_lsn;
+ DB_LSN *high_lsn;
+ int *found_commit;
+{
+ DB_LOGC *logc;
+ DB_LSN lsn;
+ DBT rec;
+ __txn_regop_args *txn_args;
+ u_int32_t rectype;
+ int ret, t_ret;
+
+ *found_commit = FALSE;
+ ret = 0;
+
+ lsn = *low_lsn;
+ if ((ret = __log_cursor(env, &logc)) != 0)
+ return (ret);
+ memset(&rec, 0, sizeof(rec));
+ if (__logc_get(logc, &lsn, &rec, DB_SET) == 0) {
+ do {
+ LOGCOPY_32(env, &rectype, rec.data);
+ if (rectype == DB___txn_regop) {
+ if ((ret = __txn_regop_read(
+ env, rec.data, &txn_args)) != 0)
+ goto close_cursor;
+ if (txn_args->opcode == TXN_COMMIT) {
+ *found_commit = TRUE;
+ __os_free(env, txn_args);
+ break;
+ }
+ __os_free(env, txn_args);
+ }
+ } while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0 &&
+ LOG_COMPARE(&lsn, high_lsn) <= 0);
+ }
+close_cursor:
+ if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
+}
+
+/*
+ * Used by a preferred master site to get remote LSN history information
+ * from the other site in the replication group.
+ */
+static int
+__repmgr_remote_lsnhist(env, eid, gen, lsnhist_match)
+ ENV *env;
+ int eid;
+ u_int32_t gen;
+ __repmgr_lsnhist_match_args *lsnhist_match;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ repmgr_netaddr_t addr;
+ __rep_lsn_hist_key_args lsnhist_key;
+ u_int8_t lsnhist_key_buf[__REP_LSN_HIST_KEY_SIZE];
+ u_int32_t type;
+ size_t len;
+ u_int8_t *response_buf;
+ int ret, t_ret;
+
+ db_rep = env->rep_handle;
+ conn = NULL;
+ response_buf = NULL;
+
+ if (!IS_KNOWN_REMOTE_SITE(eid))
+ return (0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ addr = SITE_FROM_EID(eid)->net_addr;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
+ return (ret);
+
+ /* Marshal generation for which to request remote lsnhist data. */
+ lsnhist_key.version = REP_LSN_HISTORY_FMT_VERSION;
+ lsnhist_key.gen = gen;
+ __rep_lsn_hist_key_marshal(env, &lsnhist_key, lsnhist_key_buf);
+ if ((ret = __repmgr_send_sync_msg(env, conn, REPMGR_LSNHIST_REQUEST,
+ lsnhist_key_buf, sizeof(lsnhist_key_buf))) != 0)
+ goto err;
+
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &len)) != 0)
+ goto err;
+
+ /* Unmarshal remote lsnhist time and LSNs for comparison. */
+ if (type == REPMGR_LSNHIST_RESPONSE) {
+ if ((ret = __repmgr_lsnhist_match_unmarshal(env, lsnhist_match,
+ response_buf, __REPMGR_LSNHIST_MATCH_SIZE, NULL)) != 0)
+ goto err;
+ } else {
+ /*
+ * If the other site sent back REPMGR_PREFMAS_FAILURE, it means
+ * no lsnhist record for the requested gen was found on other
+ * site.
+ */
+ if (type != REPMGR_PREFMAS_FAILURE)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "remote_lsnhist got unexpected message type %d",
+ type));
+ ret = DB_REP_UNAVAIL;
+ }
+
+err:
+ if (conn != NULL) {
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ if ((t_ret = __repmgr_destroy_conn(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ }
+ if (response_buf != NULL)
+ __os_free(env, response_buf);
+ return (ret);
+}
+
+/*
+ * Returns the number of tries and the amount of time to yield the
+ * processor for preferred master waits. The total wait is the larger
+ * of 2 seconds or 3 * ack_timeout.
+ *
+ * PUBLIC: int __repmgr_prefmas_get_wait __P((ENV *, u_int32_t *, u_long *));
+ */
+int
+__repmgr_prefmas_get_wait(env, tries, yield_usecs)
+ ENV *env;
+ u_int32_t *tries;
+ u_long *yield_usecs;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ db_timeout_t max_wait;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ *yield_usecs = 250000;
+ max_wait = DB_REPMGR_DEFAULT_ACK_TIMEOUT * 2;
+ if ((rep->ack_timeout * 3) > max_wait)
+ max_wait = rep->ack_timeout * 3;
+ *tries = max_wait / (u_int32_t)*yield_usecs;
+ return (0);
+}
+
+/*
* Produce a membership list from the known info currently in memory.
*
- * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int8_t **, size_t *));
+ * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int32_t,
+ * PUBLIC: u_int8_t **, size_t *));
*
* Caller must hold mutex.
*/
int
-__repmgr_marshal_member_list(env, bufp, lenp)
+__repmgr_marshal_member_list(env, msg_version, bufp, lenp)
ENV *env;
+ u_int32_t msg_version;
u_int8_t **bufp;
size_t *lenp;
{
@@ -1328,6 +1968,7 @@ __repmgr_marshal_member_list(env, bufp, lenp)
REPMGR_SITE *site;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *buf, *p;
size_t bufsize, len;
u_int i;
@@ -1353,14 +1994,24 @@ __repmgr_marshal_member_list(env, bufp, lenp)
if (site->membership == 0)
continue;
- site_info.host.data = site->net_addr.host;
- site_info.host.size =
- (u_int32_t)strlen(site->net_addr.host) + 1;
- site_info.port = site->net_addr.port;
- site_info.flags = site->membership;
-
- ret = __repmgr_site_info_marshal(env,
- &site_info, p, (size_t)(&buf[bufsize]-p), &len);
+ if (msg_version < 5) {
+ v4site_info.host.data = site->net_addr.host;
+ v4site_info.host.size =
+ (u_int32_t)strlen(site->net_addr.host) + 1;
+ v4site_info.port = site->net_addr.port;
+ v4site_info.flags = site->membership;
+ ret = __repmgr_v4site_info_marshal(env,
+ &v4site_info, p, (size_t)(&buf[bufsize]-p), &len);
+ } else {
+ site_info.host.data = site->net_addr.host;
+ site_info.host.size =
+ (u_int32_t)strlen(site->net_addr.host) + 1;
+ site_info.port = site->net_addr.port;
+ site_info.status = site->membership;
+ site_info.flags = site->gmdb_flags;
+ ret = __repmgr_site_info_marshal(env,
+ &site_info, p, (size_t)(&buf[bufsize]-p), &len);
+ }
DB_ASSERT(env, ret == 0);
p += len;
}
@@ -1387,7 +2038,7 @@ read_gmdb(env, ip, bufp, lenp)
DBC *dbc;
DBT key_dbt, data_dbt;
__repmgr_membership_key_args key;
- __repmgr_membership_data_args member_status;
+ __repmgr_membership_data_args member_data;
__repmgr_member_metadata_args metadata;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
@@ -1435,8 +2086,13 @@ read_gmdb(env, ip, bufp, lenp)
ret = __repmgr_member_metadata_unmarshal(env,
&metadata, metadata_buf, data_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
- DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION);
+ DB_ASSERT(env, metadata.format >= REPMGR_GMDB_FMT_MIN_VERSION &&
+ metadata.format <= REPMGR_GMDB_FMT_VERSION);
DB_ASSERT(env, metadata.version > 0);
+ /* Automatic conversion of old format gmdb if needed. */
+ if (metadata.format < REPMGR_GMDB_FMT_VERSION &&
+ (ret = convert_gmdb(env, ip, dbp, txn)) != 0)
+ goto err;
bufsize = 1000; /* Initial guess. */
if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
@@ -1459,13 +2115,14 @@ read_gmdb(env, ip, bufp, lenp)
DB_ASSERT(env, key.port > 0);
ret = __repmgr_membership_data_unmarshal(env,
- &member_status, data_buf, data_dbt.size, NULL);
+ &member_data, data_buf, data_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
- DB_ASSERT(env, member_status.flags != 0);
+ DB_ASSERT(env, member_data.status != 0);
site_info.host = key.host;
site_info.port = key.port;
- site_info.flags = member_status.flags;
+ site_info.status = member_data.status;
+ site_info.flags = member_data.flags;
if ((ret = __repmgr_site_info_marshal(env, &site_info,
p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) {
bufsize *= 2;
@@ -1501,28 +2158,129 @@ err:
}
/*
+ * Convert an older-format group membership database into the current format.
+ */
+static int
+convert_gmdb(env, ip, dbp, txn)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ DB *dbp;
+ DB_TXN *txn;
+{
+ DBC *dbc;
+ DBT key_dbt, data_dbt, v4data_dbt;
+ __repmgr_membership_key_args key;
+ __repmgr_membership_data_args member_data;
+ __repmgr_v4membership_data_args v4member_data;
+ __repmgr_member_metadata_args metadata;
+ u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
+ u_int8_t key_buf[MAX_MSG_BUF];
+ u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE];
+ u_int8_t v4data_buf[__REPMGR_V4MEMBERSHIP_DATA_SIZE];
+ int ret, t_ret;
+
+ dbc = NULL;
+
+ if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
+ goto err;
+
+ memset(&key_dbt, 0, sizeof(key_dbt));
+ key_dbt.data = key_buf;
+ key_dbt.ulen = sizeof(key_buf);
+ F_SET(&key_dbt, DB_DBT_USERMEM);
+ memset(&data_dbt, 0, sizeof(data_dbt));
+ data_dbt.data = metadata_buf;
+ data_dbt.ulen = sizeof(metadata_buf);
+ F_SET(&data_dbt, DB_DBT_USERMEM);
+ memset(&v4data_dbt, 0, sizeof(v4data_dbt));
+ v4data_dbt.data = v4data_buf;
+ v4data_dbt.ulen = sizeof(v4data_buf);
+ F_SET(&v4data_dbt, DB_DBT_USERMEM);
+
+ /*
+ * The first gmdb record is a special metadata record that contains
+ * an empty key and gmdb metadata (format and version) and has already
+ * been validated by the caller. We need to update its format value
+ * for this conversion but leave the version alone.
+ */
+ if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0)
+ goto err;
+ ret = __repmgr_membership_key_unmarshal(env,
+ &key, key_buf, key_dbt.size, NULL);
+ DB_ASSERT(env, ret == 0);
+ DB_ASSERT(env, key.host.size == 0);
+ DB_ASSERT(env, key.port == 0);
+ ret = __repmgr_member_metadata_unmarshal(env,
+ &metadata, metadata_buf, data_dbt.size, NULL);
+ DB_ASSERT(env, ret == 0);
+ DB_ASSERT(env, metadata.version > 0);
+ metadata.format = REPMGR_GMDB_FMT_VERSION;
+ __repmgr_member_metadata_marshal(env, &metadata, metadata_buf);
+ DB_INIT_DBT(data_dbt, metadata_buf, __REPMGR_MEMBER_METADATA_SIZE);
+ if ((ret = __dbc_put(dbc, &key_dbt, &data_dbt, DB_CURRENT)) != 0)
+ goto err;
+
+ /*
+ * The rest of the gmdb records contain a key (host and port) and
+ * membership data (status and now flags). But the old format was
+ * using flags for the status value, so we need to transfer the
+ * old flags value to status and provide an empty flags value for
+ * this conversion.
+ */
+ data_dbt.data = data_buf;
+ data_dbt.ulen = sizeof(data_buf);
+ while ((ret = __dbc_get(dbc, &key_dbt, &v4data_dbt, DB_NEXT)) == 0) {
+ /* Get membership data in old format. */
+ ret = __repmgr_v4membership_data_unmarshal(env,
+ &v4member_data, v4data_buf, v4data_dbt.size, NULL);
+ DB_ASSERT(env, ret == 0);
+ DB_ASSERT(env, v4member_data.flags != 0);
+
+ /* Convert membership data into current format and update. */
+ member_data.status = v4member_data.flags;
+ member_data.flags = 0;
+ __repmgr_membership_data_marshal(env, &member_data, data_buf);
+ DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
+ if ((ret = __dbc_put(dbc,
+ &key_dbt, &data_dbt, DB_CURRENT)) != 0)
+ goto err;
+ }
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+
+err:
+ if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
+}
+
+/*
* Refresh our sites array from the given membership list.
*
* PUBLIC: int __repmgr_refresh_membership __P((ENV *,
- * PUBLIC: u_int8_t *, size_t));
+ * PUBLIC: u_int8_t *, size_t, u_int32_t));
*/
int
-__repmgr_refresh_membership(env, buf, len)
+__repmgr_refresh_membership(env, buf, len, version)
ENV *env;
u_int8_t *buf;
size_t len;
+ u_int32_t version;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_SITE *site;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
char *host;
u_int8_t *p;
u_int16_t port;
- u_int32_t i, n;
+ u_int32_t i, participants;
int eid, ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
/*
* Membership list consists of membr_vers followed by a number of
@@ -1546,9 +2304,17 @@ __repmgr_refresh_membership(env, buf, len)
for (i = 0; i < db_rep->site_cnt; i++)
F_CLR(SITE_FROM_EID(i), SITE_TOUCHED);
- for (n = 0; p < &buf[len]; ++n) {
- ret = __repmgr_site_info_unmarshal(env,
- &site_info, p, (size_t)(&buf[len] - p), &p);
+ for (participants = 0; p < &buf[len]; ) {
+ if (version < 5) {
+ ret = __repmgr_v4site_info_unmarshal(env,
+ &v4site_info, p, (size_t)(&buf[len] - p), &p);
+ site_info.host = v4site_info.host;
+ site_info.port = v4site_info.port;
+ site_info.status = v4site_info.flags;
+ site_info.flags = 0;
+ } else
+ ret = __repmgr_site_info_unmarshal(env,
+ &site_info, p, (size_t)(&buf[len] - p), &p);
DB_ASSERT(env, ret == 0);
host = site_info.host.data;
@@ -1556,9 +2322,11 @@ __repmgr_refresh_membership(env, buf, len)
(u_int8_t*)site_info.host.data + site_info.host.size <= p);
host[site_info.host.size-1] = '\0';
port = site_info.port;
+ if (!FLD_ISSET(site_info.flags, SITE_VIEW))
+ participants++;
if ((ret = __repmgr_set_membership(env,
- host, port, site_info.flags)) != 0)
+ host, port, site_info.status, site_info.flags)) != 0)
goto err;
if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0)
@@ -1566,8 +2334,13 @@ __repmgr_refresh_membership(env, buf, len)
DB_ASSERT(env, IS_VALID_EID(eid));
F_SET(SITE_FROM_EID(eid), SITE_TOUCHED);
}
- ret = __rep_set_nsites_int(env, n);
+ ret = __rep_set_nsites_int(env, participants);
DB_ASSERT(env, ret == 0);
+ if (FLD_ISSET(rep->config,
+ REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT) &&
+ rep->config_nsites > 2)
+ __db_errx(env, DB_STR("3703",
+ "More than two sites in preferred master replication group"));
/* Scan "touched" flags so as to notice sites that have been removed. */
for (i = 0; i < db_rep->site_cnt; i++) {
@@ -1576,7 +2349,8 @@ __repmgr_refresh_membership(env, buf, len)
continue;
host = site->net_addr.host;
port = site->net_addr.port;
- if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0)
+ if ((ret = __repmgr_set_membership(env, host, port,
+ 0, site->gmdb_flags)) != 0)
goto err;
}
@@ -1597,13 +2371,13 @@ __repmgr_reload_gmdb(env)
size_t len;
int ret;
- ENV_ENTER(env, ip);
+ ENV_GET_THREAD_INFO(env, ip);
if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) {
env->rep_handle->have_gmdb = TRUE;
- ret = __repmgr_refresh_membership(env, buf, len);
+ ret = __repmgr_refresh_membership(env, buf, len,
+ DB_REPMGR_VERSION);
__os_free(env, buf);
}
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -1650,7 +2424,8 @@ __repmgr_init_save(env, dbt)
dbt->data = NULL;
dbt->size = 0;
ret = 0;
- } else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) {
+ } else if ((ret = __repmgr_marshal_member_list(env,
+ DB_REPMGR_VERSION, &buf, &len)) == 0) {
dbt->data = buf;
dbt->size = (u_int32_t)len;
}
@@ -1700,6 +2475,7 @@ __repmgr_defer_op(env, op)
*/
if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0)
return (ret);
+ msg->size = sizeof(*msg);
msg->msg_hdr.type = REPMGR_OWN_MSG;
REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op;
ret = __repmgr_queue_put(env, msg);
@@ -1771,7 +2547,7 @@ __repmgr_become_client(env)
if ((ret = __repmgr_await_gmdbop(env)) == 0)
db_rep->client_intent = TRUE;
UNLOCK_MUTEX(db_rep->mutex);
- return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret);
+ return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT, 0) : ret);
}
/*
@@ -1897,16 +2673,17 @@ get_eid(env, host, port, eidp)
* accordingly.
*
* PUBLIC: int __repmgr_set_membership __P((ENV *,
- * PUBLIC: const char *, u_int, u_int32_t));
+ * PUBLIC: const char *, u_int, u_int32_t, u_int32_t));
*
* Caller must host db_rep mutex, and be in ENV_ENTER context.
*/
int
-__repmgr_set_membership(env, host, port, status)
+__repmgr_set_membership(env, host, port, status, flags)
ENV *env;
const char *host;
u_int port;
u_int32_t status;
+ u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
@@ -1953,7 +2730,9 @@ __repmgr_set_membership(env, host, port, status)
/* Set both private and shared copies of the info. */
site->membership = status;
+ site->gmdb_flags = flags;
sites[eid].status = status;
+ sites[eid].flags = flags;
}
MUTEX_UNLOCK(env, rep->mtx_repmgr);
@@ -1965,7 +2744,8 @@ __repmgr_set_membership(env, host, port, status)
SELECTOR_RUNNING(db_rep)) {
if (eid == db_rep->self_eid && status != SITE_PRESENT)
- ret = DB_DELETED;
+ ret = (status == SITE_ADDING) ?
+ __repmgr_defer_op(env, REPMGR_REJOIN) : DB_DELETED;
else if (orig != SITE_PRESENT && status == SITE_PRESENT &&
site->state == SITE_IDLE) {
/*
@@ -1981,10 +2761,11 @@ __repmgr_set_membership(env, host, port, status)
* failure shouldn't hurt anything, because we'll just
* naturally try again later.
*/
- ret = __repmgr_schedule_connection_attempt(env,
- eid, TRUE);
- if (eid != db_rep->self_eid)
+ if (eid != db_rep->self_eid) {
+ ret = __repmgr_schedule_connection_attempt(env,
+ eid, TRUE);
DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid);
+ }
} else if (orig != 0 && status == 0)
DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid);
@@ -2084,3 +2865,73 @@ __repmgr_bcast_own_msg(env, type, buf, len)
}
return (0);
}
+
+/*
+ * PUBLIC: int __repmgr_bcast_member_list __P((ENV *));
+ *
+ * Broadcast membership list to all other sites in the replication group.
+ *
+ * Caller must hold mutex.
+ */
+int
+__repmgr_bcast_member_list(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ REPMGR_SITE *site;
+ u_int8_t *buf, *v4buf;
+ size_t len, v4len;
+ int ret;
+ u_int i;
+
+ db_rep = env->rep_handle;
+ if (!SELECTOR_RUNNING(db_rep))
+ return (0);
+ buf = NULL;
+ v4buf = NULL;
+ LOCK_MUTEX(db_rep->mutex);
+ /*
+ * Some of the other sites in the replication group might be at
+ * an older version, so we need to be able to send the membership
+ * list in the current or older format.
+ */
+ if ((ret = __repmgr_marshal_member_list(env,
+ DB_REPMGR_VERSION, &buf, &len)) != 0 ||
+ (ret = __repmgr_marshal_member_list(env,
+ 4, &v4buf, &v4len)) != 0) {
+ UNLOCK_MUTEX(db_rep->mutex);
+ goto out;
+ }
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Broadcast latest membership list"));
+ FOR_EACH_REMOTE_SITE_INDEX(i) {
+ site = SITE_FROM_EID(i);
+ if (site->state != SITE_CONNECTED)
+ continue;
+ if ((conn = site->ref.conn.in) != NULL &&
+ conn->state == CONN_READY &&
+ (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING,
+ (conn->version < 5 ? v4buf : buf),
+ (conn->version < 5 ? (u_int32_t) v4len : (u_int32_t)len)))
+ != 0 &&
+ (ret = __repmgr_bust_connection(env, conn)) != 0)
+ goto out;
+ if ((conn = site->ref.conn.out) != NULL &&
+ conn->state == CONN_READY &&
+ (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING,
+ (conn->version < 5 ? v4buf : buf),
+ (conn->version < 5 ? (u_int32_t)v4len : (u_int32_t)len)))
+ != 0 &&
+ (ret = __repmgr_bust_connection(env, conn)) != 0)
+ goto out;
+ }
+out:
+ if (buf != NULL)
+ __os_free(env, buf);
+ if (v4buf != NULL)
+ __os_free(env, v4buf);
+ return (ret);
+}