diff options
| author | Lorry Tar Creator <lorry-tar-importer@baserock.org> | 2015-02-17 17:25:57 +0000 |
|---|---|---|
| committer | <> | 2015-03-17 16:26:24 +0000 |
| commit | 780b92ada9afcf1d58085a83a0b9e6bc982203d1 (patch) | |
| tree | 598f8b9fa431b228d29897e798de4ac0c1d3d970 /src/repmgr/repmgr_util.c | |
| parent | 7a2660ba9cc2dc03a69ddfcfd95369395cc87444 (diff) | |
| download | berkeleydb-master.tar.gz | |
Diffstat (limited to 'src/repmgr/repmgr_util.c')
| -rw-r--r-- | src/repmgr/repmgr_util.c | 957 |
1 files changed, 904 insertions, 53 deletions
diff --git a/src/repmgr/repmgr_util.c b/src/repmgr/repmgr_util.c index c2439436..1c5ebe59 100644 --- a/src/repmgr/repmgr_util.c +++ b/src/repmgr/repmgr_util.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -15,9 +15,13 @@ #define INITIAL_SITES_ALLOCATION 3 /* Arbitrary guess. */ +static int convert_gmdb(ENV *, DB_THREAD_INFO *, DB *, DB_TXN *); static int get_eid __P((ENV *, const char *, u_int, int *)); -static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *)); static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *)); +static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *)); +static int __repmgr_find_commit __P((ENV *, DB_LSN *, DB_LSN *, int *)); +static int __repmgr_remote_lsnhist(ENV *, int, u_int32_t, + __repmgr_lsnhist_match_args *); /* * Schedules a future attempt to re-establish a connection with the given site. @@ -43,6 +47,8 @@ __repmgr_schedule_connection_attempt(env, eid, immediate) REP *rep; REPMGR_RETRY *retry, *target; REPMGR_SITE *site; + SITEINFO *sites; + db_timeout_t timeout; db_timespec t; int ret; @@ -57,7 +63,24 @@ __repmgr_schedule_connection_attempt(env, eid, immediate) if (immediate) TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries); else { - TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait); + /* + * Normally we retry a connection after connection retry + * timeout. In a subordinate rep-aware process, we retry sooner + * when there is a listener candidate on the disconnected site. + * The listener process will be connected from the new listener, + * but subordinate rep-aware process can only wait for retry. + * It matters when the subordinate process becomes listener and + * the disconnected site is master. The m_listener_wait is set + * to retry after enough time has passed for a takeover. The + * number of listener candidates is maintained in the listener + * process as it has connections to all subordinate processes + * from other sites. + */ + timeout = rep->connection_retry_wait; + CHECK_LISTENER_CAND(timeout, >0, db_rep->m_listener_wait, + timeout); + TIMESPEC_ADD_DB_TIMEOUT(&t, timeout); + /* * Insert the new "retry" on the (time-ordered) list in its * proper position. To do so, find the list entry ("target") @@ -284,6 +307,7 @@ __repmgr_new_site(env, sitep, host, port) site->net_addr.host = p; site->net_addr.port = (u_int16_t)port; + site->max_ack_gen = 0; ZERO_LSN(site->max_ack); site->ack_policy = 0; site->alignment = 0; @@ -295,6 +319,7 @@ __repmgr_new_site(env, sitep, host, port) site->state = SITE_IDLE; site->membership = 0; + site->gmdb_flags = 0; site->config = 0; *sitep = site; @@ -535,11 +560,14 @@ __repmgr_thread_failure(env, why) int why; { DB_REP *db_rep; + DB_THREAD_INFO *ip; db_rep = env->rep_handle; + ENV_ENTER(env, ip); LOCK_MUTEX(db_rep->mutex); (void)__repmgr_stop_threads(env); UNLOCK_MUTEX(db_rep->mutex); + ENV_LEAVE(env, ip); return (__env_panic(env, why)); } @@ -597,12 +625,13 @@ __repmgr_format_addr_loc(addr, buffer) } /* - * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t)); + * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t, u_int32_t)); */ int -__repmgr_repstart(env, flags) +__repmgr_repstart(env, flags, startopts) ENV *env; u_int32_t flags; + u_int32_t startopts; { DBT my_addr; int ret; @@ -610,7 +639,11 @@ __repmgr_repstart(env, flags) /* Include "cdata" in case sending to old-version site. */ if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) return (ret); - ret = __rep_start_int(env, &my_addr, flags); + /* + * force_role_chg and hold_client_gen are used by preferred master + * mode to help control site startup. + */ + ret = __rep_start_int(env, &my_addr, flags, startopts); __os_free(env, my_addr.data); if (ret != 0) __db_err(env, ret, DB_STR("3673", "rep_start")); @@ -618,11 +651,12 @@ __repmgr_repstart(env, flags) } /* - * PUBLIC: int __repmgr_become_master __P((ENV *)); + * PUBLIC: int __repmgr_become_master __P((ENV *, u_int32_t)); */ int -__repmgr_become_master(env) +__repmgr_become_master(env, startopts) ENV *env; + u_int32_t startopts; { DB_REP *db_rep; DB_THREAD_INFO *ip; @@ -631,7 +665,7 @@ __repmgr_become_master(env) REPMGR_SITE *site; DBT key_dbt, data_dbt; __repmgr_membership_key_args key; - __repmgr_membership_data_args member_status; + __repmgr_membership_data_args member_data; repmgr_netaddr_t addr; u_int32_t status; u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE]; @@ -668,16 +702,23 @@ __repmgr_become_master(env) db_rep->client_intent = FALSE; UNLOCK_MUTEX(db_rep->mutex); - if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0) + if ((ret = __repmgr_repstart(env, DB_REP_MASTER, startopts)) != 0) return (ret); + /* + * Make sure member_version_gen is current so that this master + * can reject obsolete member lists from other sites. + */ + db_rep->member_version_gen = db_rep->region->gen; + + /* If there is already a gmdb, we are finished. */ if (db_rep->have_gmdb) return (0); - db_rep->member_version_gen = db_rep->region->gen; - ENV_ENTER(env, ip); + /* There isn't a gmdb. Create one from the in-memory site list. */ if ((ret = __repmgr_hold_master_role(env, NULL)) != 0) goto leave; + ENV_GET_THREAD_INFO(env, ip); retry: if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0) goto err; @@ -705,8 +746,9 @@ retry: &key, key_buf, sizeof(key_buf), &len); DB_ASSERT(env, ret == 0); DB_INIT_DBT(key_dbt, key_buf, len); - member_status.flags = status; - __repmgr_membership_data_marshal(env, &member_status, data_buf); + member_data.status = status; + member_data.flags = site->gmdb_flags; + __repmgr_membership_data_marshal(env, &member_data, data_buf); DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE); if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0) goto err; @@ -726,7 +768,6 @@ err: if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0) ret = t_ret; leave: - ENV_LEAVE(env, ip); return (ret); } @@ -840,6 +881,14 @@ __repmgr_open(env, rep_) rep->election_retry_wait = db_rep->election_retry_wait; rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout; rep->heartbeat_frequency = db_rep->heartbeat_frequency; + rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes; + rep->inqueue_max_bytes = db_rep->inqueue_max_bytes; + if (rep->inqueue_max_gbytes == 0 && rep->inqueue_max_bytes == 0) { + rep->inqueue_max_bytes = DB_REPMGR_DEFAULT_INQUEUE_MAX; + } + __repmgr_set_incoming_queue_redzone(rep, rep->inqueue_max_gbytes, + rep->inqueue_max_bytes); + return (ret); } @@ -958,6 +1007,18 @@ __repmgr_join(env, rep_) } db_rep->siteinfo_seq = rep->siteinfo_seq; + /* + * Update the incoming queue limit settings if necessary. + */ + if ((db_rep->inqueue_max_gbytes != 0 || + db_rep->inqueue_max_bytes != 0) && + (db_rep->inqueue_max_gbytes != rep->inqueue_max_gbytes || + db_rep->inqueue_max_bytes != rep->inqueue_max_gbytes)) { + rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes; + rep->inqueue_max_bytes = db_rep->inqueue_max_bytes; + __repmgr_set_incoming_queue_redzone(rep, + rep->inqueue_max_gbytes, rep->inqueue_max_bytes); + } unlock: MUTEX_UNLOCK(env, rep->mtx_repmgr); return (ret); @@ -1073,6 +1134,7 @@ __repmgr_share_netaddrs(env, rep_, start, limit) shared_array[eid].addr.port = db_rep->sites[i].net_addr.port; shared_array[eid].config = db_rep->sites[i].config; shared_array[eid].status = db_rep->sites[i].membership; + shared_array[eid].flags = db_rep->sites[i].gmdb_flags; RPRINT(env, (env, DB_VERB_REPMGR_MISC, "EID %d is assigned for site %s:%lu", eid, host, (u_long)shared_array[eid].addr.port)); @@ -1134,6 +1196,7 @@ __repmgr_copy_in_added_sites(env) site = SITE_FROM_EID(i); site->config = p->config; site->membership = p->status; + site->gmdb_flags = p->flags; } out: @@ -1266,7 +1329,9 @@ __repmgr_stable_lsn(env, stable_lsn) db_rep = env->rep_handle; rep = db_rep->region; - if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) { + LOCK_MUTEX(db_rep->mutex); + if (rep->sites_avail != 0 && rep->min_log_file != 0 && + rep->min_log_file < stable_lsn->file) { /* * Returning an LSN to be consistent with the rest of the * log archiving processing. Construct LSN of format @@ -1276,12 +1341,91 @@ __repmgr_stable_lsn(env, stable_lsn) stable_lsn->offset = 0; } RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]", - (u_long)stable_lsn->file, (u_long)stable_lsn->offset)); +"Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu] sites_avail %lu min_log %lu", + (u_long)stable_lsn->file, (u_long)stable_lsn->offset, + (u_long)rep->sites_avail, (u_long)rep->min_log_file)); + UNLOCK_MUTEX(db_rep->mutex); return (0); } /* + * PUBLIC: int __repmgr_make_request_conn __P((ENV *, + * PUBLIC: repmgr_netaddr_t *, REPMGR_CONNECTION **)); + */ +int +__repmgr_make_request_conn(env, addr, connp) + ENV *env; + repmgr_netaddr_t *addr; + REPMGR_CONNECTION **connp; +{ + DBT vi; + __repmgr_msg_hdr_args msg_hdr; + __repmgr_version_confirmation_args conf; + REPMGR_CONNECTION *conn; + int alloc, ret, unused; + + alloc = FALSE; + if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0) + return (ret); + conn->type = APP_CONNECTION; + + /* Read a handshake msg, to get version confirmation and parameters. */ + if ((ret = __repmgr_read_conn(conn)) != 0) + goto err; + /* + * We can only get here after having read the full 9 bytes that we + * expect, so this can't fail. + */ + DB_ASSERT(env, conn->reading_phase == SIZES_PHASE); + ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, + conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); + DB_ASSERT(env, ret == 0); + __repmgr_iovec_init(&conn->iovecs); + conn->reading_phase = DATA_PHASE; + + if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0) + goto err; + alloc = TRUE; + + if ((ret = __repmgr_read_conn(conn)) != 0) + goto err; + + /* + * Analyze the handshake msg, and stash relevant info. + */ + if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0) + goto err; + DB_ASSERT(env, vi.size > 0); + if ((ret = __repmgr_version_confirmation_unmarshal(env, + &conf, vi.data, vi.size, NULL)) != 0) + goto err; + + if (conf.version < GM_MIN_VERSION || + (IS_VIEW_SITE(env) && conf.version < VIEW_MIN_VERSION) || + (PREFMAS_IS_SET(env) && conf.version < PREFMAS_MIN_VERSION)) { + ret = DB_REP_UNAVAIL; + goto err; + } + conn->version = conf.version; + +err: + if (alloc) { + DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0); + __os_free(env, conn->input.repmgr_msg.cntrl.data); + DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0); + __os_free(env, conn->input.repmgr_msg.rec.data); + } + __repmgr_reset_for_reading(conn); + if (ret == 0) + *connp = conn; + else { + (void)__repmgr_close_connection(env, conn); + (void)__repmgr_destroy_conn(env, conn); + } + return (ret); +} + +/* * PUBLIC: int __repmgr_send_sync_msg __P((ENV *, REPMGR_CONNECTION *, * PUBLIC: u_int32_t, u_int8_t *, u_int32_t)); */ @@ -1311,15 +1455,511 @@ __repmgr_send_sync_msg(env, conn, type, buf, len) } /* + * Reads a whole message, when we expect to get a REPMGR_OWN_MSG. + */ +/* + * PUBLIC: int __repmgr_read_own_msg __P((ENV *, REPMGR_CONNECTION *, + * PUBLIC: u_int32_t *, u_int8_t **, size_t *)); + */ +int +__repmgr_read_own_msg(env, conn, typep, bufp, lenp) + ENV *env; + REPMGR_CONNECTION *conn; + u_int32_t *typep; + u_int8_t **bufp; + size_t *lenp; +{ + __repmgr_msg_hdr_args msg_hdr; + u_int8_t *buf; + u_int32_t type; + size_t size; + int ret; + + __repmgr_reset_for_reading(conn); + if ((ret = __repmgr_read_conn(conn)) != 0) + goto err; + ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, + conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); + DB_ASSERT(env, ret == 0); + + if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) { + ret = DB_REP_UNAVAIL; /* Protocol violation. */ + goto err; + } + type = REPMGR_OWN_MSG_TYPE(msg_hdr); + if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) { + conn->reading_phase = DATA_PHASE; + __repmgr_iovec_init(&conn->iovecs); + + if ((ret = __os_malloc(env, size, &buf)) != 0) + goto err; + conn->input.rep_message = NULL; + + __repmgr_add_buffer(&conn->iovecs, buf, size); + if ((ret = __repmgr_read_conn(conn)) != 0) { + __os_free(env, buf); + goto err; + } + *bufp = buf; + } + + *typep = type; + *lenp = size; + +err: + return (ret); +} + +/* + * Returns TRUE if we are connected to the other site in a preferred + * master replication group, FALSE otherwise. + * + * PUBLIC: int __repmgr_prefmas_connected __P((ENV *)); + */ +int +__repmgr_prefmas_connected(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + REPMGR_SITE *other_site; + + db_rep = env->rep_handle; + + /* + * Preferred master mode only has 2 sites, so the other site is + * always EID 1. + */ + if (!IS_PREFMAS_MODE(env) || !IS_KNOWN_REMOTE_SITE(1)) + return (FALSE); + + other_site = SITE_FROM_EID(1); + if (other_site->state == SITE_CONNECTED) + return (TRUE); + + if ((conn = other_site->ref.conn.in) != NULL && + IS_READY_STATE(conn->state)) + return (TRUE); + if ((conn = other_site->ref.conn.out) != NULL && + IS_READY_STATE(conn->state)) + return (TRUE); + + return (FALSE); +} + +/* + * Used by a preferred master site to restart the remote temporary master + * site as a client. This is used to help guarantee that the preferred master + * site's transactions are never rolled back. + * + * PUBLIC: int __repmgr_restart_site_as_client __P((ENV *, int)); + */ +int +__repmgr_restart_site_as_client(env, eid) + ENV *env; + int eid; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + repmgr_netaddr_t addr; + u_int32_t type; + size_t len; + u_int8_t any_value, *response_buf; + int ret, t_ret; + + COMPQUIET(any_value, 0); + db_rep = env->rep_handle; + conn = NULL; + + if (!IS_PREFMAS_MODE(env)) + return (0); + + LOCK_MUTEX(db_rep->mutex); + addr = SITE_FROM_EID(eid)->net_addr; + UNLOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) + return (ret); + + /* + * No payload needed, but must send at least a dummy byte for the + * other side to recognize that a message has arrived. + */ + if ((ret = __repmgr_send_sync_msg(env, conn, + REPMGR_RESTART_CLIENT, VOID_STAR_CAST &any_value, 1)) != 0) + goto err; + + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &len)) != 0) + goto err; + if (type != REPMGR_PREFMAS_SUCCESS) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "restart_site_as_client got unexpected message type %d", + type)); + ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */ + } +err: + if (conn != NULL) { + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret != 0) + ret = t_ret; + if ((t_ret = __repmgr_destroy_conn(env, + conn)) != 0 && ret != 0) + ret = t_ret; + } + return (ret); +} + +/* + * Used by a preferred master site to make the remote temporary master + * site a readonly master. This is used to help preserve all temporary + * master transactions. + * + * PUBLIC: int __repmgr_make_site_readonly_master __P((ENV *, int, + * PUBLIC: u_int32_t *, DB_LSN *)); + */ +int +__repmgr_make_site_readonly_master(env, eid, gen, sync_lsnp) + ENV *env; + int eid; + u_int32_t *gen; + DB_LSN *sync_lsnp; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + repmgr_netaddr_t addr; + __repmgr_permlsn_args permlsn; + u_int32_t type; + size_t len; + u_int8_t any_value, *response_buf; + int ret, t_ret; + + COMPQUIET(any_value, 0); + db_rep = env->rep_handle; + conn = NULL; + response_buf = NULL; + *gen = 0; + ZERO_LSN(*sync_lsnp); + + if (!IS_PREFMAS_MODE(env)) + return (0); + + LOCK_MUTEX(db_rep->mutex); + addr = SITE_FROM_EID(eid)->net_addr; + UNLOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) + return (ret); + + /* + * No payload needed, but must send at least a dummy byte for the + * other side to recognize that a message has arrived. + */ + if ((ret = __repmgr_send_sync_msg(env, conn, + REPMGR_READONLY_MASTER, VOID_STAR_CAST &any_value, 1)) != 0) + goto err; + + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &len)) != 0) + goto err; + + if (type == REPMGR_READONLY_RESPONSE) { + if ((ret = __repmgr_permlsn_unmarshal(env, + &permlsn, response_buf, len, NULL)) != 0) + goto err; + *gen = permlsn.generation; + *sync_lsnp = permlsn.lsn; + } else { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "make_site_readonly_master got unexpected message type %d", + type)); + ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */ + } + +err: + if (conn != NULL) { + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret != 0) + ret = t_ret; + if ((t_ret = __repmgr_destroy_conn(env, + conn)) != 0 && ret != 0) + ret = t_ret; + } + if (response_buf != NULL) + __os_free(env, response_buf); + return (ret); +} + +/* + * Used by a preferred master site to perform the LSN history comparisons to + * determine whether there is are continuous or conflicting sets of + * transactions between this site and the remote temporary master. + * + * PUBLIC: int __repmgr_lsnhist_match __P((ENV *, + * PUBLIC: DB_THREAD_INFO *, int, int *)); + */ +int +__repmgr_lsnhist_match(env, ip, eid, match) + ENV *env; + DB_THREAD_INFO *ip; + int eid; + int *match; +{ + DB_REP *db_rep; + REP *rep; + __rep_lsn_hist_data_args my_lsnhist; + __repmgr_lsnhist_match_args remote_lsnhist; + u_int32_t my_gen; + int found_commit, ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + *match = FALSE; + my_gen = rep->gen; + found_commit = FALSE; + + if (!IS_PREFMAS_MODE(env)) + return (0); + + /* Get local LSN history information for comparison. */ + if ((ret = __rep_get_lsnhist_data(env, ip, my_gen, &my_lsnhist)) != 0) + return (ret); + + /* Get remote LSN history information for comparison. */ + ret = __repmgr_remote_lsnhist(env, eid, my_gen, &remote_lsnhist); + + /* + * If the current gen doesn't exist at the remote site, the match + * fails. + * + * If the remote LSN or timestamp at the current gen doesn't match + * ours, we probably had a whack-a-mole situation where each site + * as up and down in isolation one or more times and the match fails. + * + * If the remote LSN for the next generation is lower than this + * site's startup LSN and there are any commit operations between + * these LSNs, there are conflicting sets of transactions and the + * match fails. + */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match my_lsn [%lu][%lu] remote_lsn [%lu][%lu]", + (u_long)my_lsnhist.lsn.file, (u_long)my_lsnhist.lsn.offset, + (u_long)remote_lsnhist.lsn.file, + (u_long)remote_lsnhist.lsn.offset)); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match my_time %lu:%lu remote_time %lu:%lu", + (u_long)my_lsnhist.hist_sec, (u_long)my_lsnhist.hist_nsec, + (u_long)remote_lsnhist.hist_sec, (u_long)remote_lsnhist.hist_nsec)); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match pminit_lsn [%lu][%lu] next_gen_lsn [%lu][%lu]", + (u_long)db_rep->prefmas_init_lsn.file, + (u_long)db_rep->prefmas_init_lsn.offset, + (u_long)remote_lsnhist.next_gen_lsn.file, + (u_long)remote_lsnhist.next_gen_lsn.offset)); + if (ret != DB_REP_UNAVAIL && + LOG_COMPARE(&my_lsnhist.lsn, &remote_lsnhist.lsn) == 0 && + my_lsnhist.hist_sec == remote_lsnhist.hist_sec && + my_lsnhist.hist_nsec == remote_lsnhist.hist_nsec) { + /* + * If the remote site doesn't yet have the next gen or if + * our startup LSN is <= than the remote next gen LSN, we + * have a match. + * + * Otherwise, our startup LSN is higher than the remote + * next gen LSN. If we have any commit operations between + * these two LSNs, we have preferred master operations we + * must preserve and there is not a match. But if we just + * have uncommitted operations between these LSNs it doesn't + * matter if they are rolled back, so we call it a match and + * try to retain temporary master transactions if possible. + */ + if (IS_ZERO_LSN(remote_lsnhist.next_gen_lsn) || + LOG_COMPARE(&db_rep->prefmas_init_lsn, + &remote_lsnhist.next_gen_lsn) <= 0) + *match = TRUE; + else if ((ret = __repmgr_find_commit(env, + &remote_lsnhist.next_gen_lsn, + &db_rep->prefmas_init_lsn, &found_commit)) == 0 && + !found_commit) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match !found_commit set match TRUE")); + *match = TRUE; + } + } + + /* Don't return an error if current gen didn't exist at remote site. */ + if (ret == DB_REP_UNAVAIL) + ret = 0; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match match %d returning %d", *match, ret)); + return (ret); +} + +/* + * Checks a range of log records from low_lsn to high_lsn for any + * commit operations. Sets found_commit to TRUE if a commit is + * found. + */ +static int +__repmgr_find_commit(env, low_lsn, high_lsn, found_commit) + ENV *env; + DB_LSN *low_lsn; + DB_LSN *high_lsn; + int *found_commit; +{ + DB_LOGC *logc; + DB_LSN lsn; + DBT rec; + __txn_regop_args *txn_args; + u_int32_t rectype; + int ret, t_ret; + + *found_commit = FALSE; + ret = 0; + + lsn = *low_lsn; + if ((ret = __log_cursor(env, &logc)) != 0) + return (ret); + memset(&rec, 0, sizeof(rec)); + if (__logc_get(logc, &lsn, &rec, DB_SET) == 0) { + do { + LOGCOPY_32(env, &rectype, rec.data); + if (rectype == DB___txn_regop) { + if ((ret = __txn_regop_read( + env, rec.data, &txn_args)) != 0) + goto close_cursor; + if (txn_args->opcode == TXN_COMMIT) { + *found_commit = TRUE; + __os_free(env, txn_args); + break; + } + __os_free(env, txn_args); + } + } while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0 && + LOG_COMPARE(&lsn, high_lsn) <= 0); + } +close_cursor: + if ((t_ret = __logc_close(logc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * Used by a preferred master site to get remote LSN history information + * from the other site in the replication group. + */ +static int +__repmgr_remote_lsnhist(env, eid, gen, lsnhist_match) + ENV *env; + int eid; + u_int32_t gen; + __repmgr_lsnhist_match_args *lsnhist_match; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + repmgr_netaddr_t addr; + __rep_lsn_hist_key_args lsnhist_key; + u_int8_t lsnhist_key_buf[__REP_LSN_HIST_KEY_SIZE]; + u_int32_t type; + size_t len; + u_int8_t *response_buf; + int ret, t_ret; + + db_rep = env->rep_handle; + conn = NULL; + response_buf = NULL; + + if (!IS_KNOWN_REMOTE_SITE(eid)) + return (0); + + LOCK_MUTEX(db_rep->mutex); + addr = SITE_FROM_EID(eid)->net_addr; + UNLOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) + return (ret); + + /* Marshal generation for which to request remote lsnhist data. */ + lsnhist_key.version = REP_LSN_HISTORY_FMT_VERSION; + lsnhist_key.gen = gen; + __rep_lsn_hist_key_marshal(env, &lsnhist_key, lsnhist_key_buf); + if ((ret = __repmgr_send_sync_msg(env, conn, REPMGR_LSNHIST_REQUEST, + lsnhist_key_buf, sizeof(lsnhist_key_buf))) != 0) + goto err; + + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &len)) != 0) + goto err; + + /* Unmarshal remote lsnhist time and LSNs for comparison. */ + if (type == REPMGR_LSNHIST_RESPONSE) { + if ((ret = __repmgr_lsnhist_match_unmarshal(env, lsnhist_match, + response_buf, __REPMGR_LSNHIST_MATCH_SIZE, NULL)) != 0) + goto err; + } else { + /* + * If the other site sent back REPMGR_PREFMAS_FAILURE, it means + * no lsnhist record for the requested gen was found on other + * site. + */ + if (type != REPMGR_PREFMAS_FAILURE) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "remote_lsnhist got unexpected message type %d", + type)); + ret = DB_REP_UNAVAIL; + } + +err: + if (conn != NULL) { + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret != 0) + ret = t_ret; + if ((t_ret = __repmgr_destroy_conn(env, + conn)) != 0 && ret != 0) + ret = t_ret; + } + if (response_buf != NULL) + __os_free(env, response_buf); + return (ret); +} + +/* + * Returns the number of tries and the amount of time to yield the + * processor for preferred master waits. The total wait is the larger + * of 2 seconds or 3 * ack_timeout. + * + * PUBLIC: int __repmgr_prefmas_get_wait __P((ENV *, u_int32_t *, u_long *)); + */ +int +__repmgr_prefmas_get_wait(env, tries, yield_usecs) + ENV *env; + u_int32_t *tries; + u_long *yield_usecs; +{ + DB_REP *db_rep; + REP *rep; + db_timeout_t max_wait; + + db_rep = env->rep_handle; + rep = db_rep->region; + + *yield_usecs = 250000; + max_wait = DB_REPMGR_DEFAULT_ACK_TIMEOUT * 2; + if ((rep->ack_timeout * 3) > max_wait) + max_wait = rep->ack_timeout * 3; + *tries = max_wait / (u_int32_t)*yield_usecs; + return (0); +} + +/* * Produce a membership list from the known info currently in memory. * - * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int8_t **, size_t *)); + * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int32_t, + * PUBLIC: u_int8_t **, size_t *)); * * Caller must hold mutex. */ int -__repmgr_marshal_member_list(env, bufp, lenp) +__repmgr_marshal_member_list(env, msg_version, bufp, lenp) ENV *env; + u_int32_t msg_version; u_int8_t **bufp; size_t *lenp; { @@ -1328,6 +1968,7 @@ __repmgr_marshal_member_list(env, bufp, lenp) REPMGR_SITE *site; __repmgr_membr_vers_args membr_vers; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *buf, *p; size_t bufsize, len; u_int i; @@ -1353,14 +1994,24 @@ __repmgr_marshal_member_list(env, bufp, lenp) if (site->membership == 0) continue; - site_info.host.data = site->net_addr.host; - site_info.host.size = - (u_int32_t)strlen(site->net_addr.host) + 1; - site_info.port = site->net_addr.port; - site_info.flags = site->membership; - - ret = __repmgr_site_info_marshal(env, - &site_info, p, (size_t)(&buf[bufsize]-p), &len); + if (msg_version < 5) { + v4site_info.host.data = site->net_addr.host; + v4site_info.host.size = + (u_int32_t)strlen(site->net_addr.host) + 1; + v4site_info.port = site->net_addr.port; + v4site_info.flags = site->membership; + ret = __repmgr_v4site_info_marshal(env, + &v4site_info, p, (size_t)(&buf[bufsize]-p), &len); + } else { + site_info.host.data = site->net_addr.host; + site_info.host.size = + (u_int32_t)strlen(site->net_addr.host) + 1; + site_info.port = site->net_addr.port; + site_info.status = site->membership; + site_info.flags = site->gmdb_flags; + ret = __repmgr_site_info_marshal(env, + &site_info, p, (size_t)(&buf[bufsize]-p), &len); + } DB_ASSERT(env, ret == 0); p += len; } @@ -1387,7 +2038,7 @@ read_gmdb(env, ip, bufp, lenp) DBC *dbc; DBT key_dbt, data_dbt; __repmgr_membership_key_args key; - __repmgr_membership_data_args member_status; + __repmgr_membership_data_args member_data; __repmgr_member_metadata_args metadata; __repmgr_membr_vers_args membr_vers; __repmgr_site_info_args site_info; @@ -1435,8 +2086,13 @@ read_gmdb(env, ip, bufp, lenp) ret = __repmgr_member_metadata_unmarshal(env, &metadata, metadata_buf, data_dbt.size, NULL); DB_ASSERT(env, ret == 0); - DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION); + DB_ASSERT(env, metadata.format >= REPMGR_GMDB_FMT_MIN_VERSION && + metadata.format <= REPMGR_GMDB_FMT_VERSION); DB_ASSERT(env, metadata.version > 0); + /* Automatic conversion of old format gmdb if needed. */ + if (metadata.format < REPMGR_GMDB_FMT_VERSION && + (ret = convert_gmdb(env, ip, dbp, txn)) != 0) + goto err; bufsize = 1000; /* Initial guess. */ if ((ret = __os_malloc(env, bufsize, &buf)) != 0) @@ -1459,13 +2115,14 @@ read_gmdb(env, ip, bufp, lenp) DB_ASSERT(env, key.port > 0); ret = __repmgr_membership_data_unmarshal(env, - &member_status, data_buf, data_dbt.size, NULL); + &member_data, data_buf, data_dbt.size, NULL); DB_ASSERT(env, ret == 0); - DB_ASSERT(env, member_status.flags != 0); + DB_ASSERT(env, member_data.status != 0); site_info.host = key.host; site_info.port = key.port; - site_info.flags = member_status.flags; + site_info.status = member_data.status; + site_info.flags = member_data.flags; if ((ret = __repmgr_site_info_marshal(env, &site_info, p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) { bufsize *= 2; @@ -1501,28 +2158,129 @@ err: } /* + * Convert an older-format group membership database into the current format. + */ +static int +convert_gmdb(env, ip, dbp, txn) + ENV *env; + DB_THREAD_INFO *ip; + DB *dbp; + DB_TXN *txn; +{ + DBC *dbc; + DBT key_dbt, data_dbt, v4data_dbt; + __repmgr_membership_key_args key; + __repmgr_membership_data_args member_data; + __repmgr_v4membership_data_args v4member_data; + __repmgr_member_metadata_args metadata; + u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE]; + u_int8_t key_buf[MAX_MSG_BUF]; + u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE]; + u_int8_t v4data_buf[__REPMGR_V4MEMBERSHIP_DATA_SIZE]; + int ret, t_ret; + + dbc = NULL; + + if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0) + goto err; + + memset(&key_dbt, 0, sizeof(key_dbt)); + key_dbt.data = key_buf; + key_dbt.ulen = sizeof(key_buf); + F_SET(&key_dbt, DB_DBT_USERMEM); + memset(&data_dbt, 0, sizeof(data_dbt)); + data_dbt.data = metadata_buf; + data_dbt.ulen = sizeof(metadata_buf); + F_SET(&data_dbt, DB_DBT_USERMEM); + memset(&v4data_dbt, 0, sizeof(v4data_dbt)); + v4data_dbt.data = v4data_buf; + v4data_dbt.ulen = sizeof(v4data_buf); + F_SET(&v4data_dbt, DB_DBT_USERMEM); + + /* + * The first gmdb record is a special metadata record that contains + * an empty key and gmdb metadata (format and version) and has already + * been validated by the caller. We need to update its format value + * for this conversion but leave the version alone. + */ + if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0) + goto err; + ret = __repmgr_membership_key_unmarshal(env, + &key, key_buf, key_dbt.size, NULL); + DB_ASSERT(env, ret == 0); + DB_ASSERT(env, key.host.size == 0); + DB_ASSERT(env, key.port == 0); + ret = __repmgr_member_metadata_unmarshal(env, + &metadata, metadata_buf, data_dbt.size, NULL); + DB_ASSERT(env, ret == 0); + DB_ASSERT(env, metadata.version > 0); + metadata.format = REPMGR_GMDB_FMT_VERSION; + __repmgr_member_metadata_marshal(env, &metadata, metadata_buf); + DB_INIT_DBT(data_dbt, metadata_buf, __REPMGR_MEMBER_METADATA_SIZE); + if ((ret = __dbc_put(dbc, &key_dbt, &data_dbt, DB_CURRENT)) != 0) + goto err; + + /* + * The rest of the gmdb records contain a key (host and port) and + * membership data (status and now flags). But the old format was + * using flags for the status value, so we need to transfer the + * old flags value to status and provide an empty flags value for + * this conversion. + */ + data_dbt.data = data_buf; + data_dbt.ulen = sizeof(data_buf); + while ((ret = __dbc_get(dbc, &key_dbt, &v4data_dbt, DB_NEXT)) == 0) { + /* Get membership data in old format. */ + ret = __repmgr_v4membership_data_unmarshal(env, + &v4member_data, v4data_buf, v4data_dbt.size, NULL); + DB_ASSERT(env, ret == 0); + DB_ASSERT(env, v4member_data.flags != 0); + + /* Convert membership data into current format and update. */ + member_data.status = v4member_data.flags; + member_data.flags = 0; + __repmgr_membership_data_marshal(env, &member_data, data_buf); + DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE); + if ((ret = __dbc_put(dbc, + &key_dbt, &data_dbt, DB_CURRENT)) != 0) + goto err; + } + if (ret == DB_NOTFOUND) + ret = 0; + +err: + if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* * Refresh our sites array from the given membership list. * * PUBLIC: int __repmgr_refresh_membership __P((ENV *, - * PUBLIC: u_int8_t *, size_t)); + * PUBLIC: u_int8_t *, size_t, u_int32_t)); */ int -__repmgr_refresh_membership(env, buf, len) +__repmgr_refresh_membership(env, buf, len, version) ENV *env; u_int8_t *buf; size_t len; + u_int32_t version; { DB_REP *db_rep; + REP *rep; REPMGR_SITE *site; __repmgr_membr_vers_args membr_vers; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; char *host; u_int8_t *p; u_int16_t port; - u_int32_t i, n; + u_int32_t i, participants; int eid, ret; db_rep = env->rep_handle; + rep = db_rep->region; /* * Membership list consists of membr_vers followed by a number of @@ -1546,9 +2304,17 @@ __repmgr_refresh_membership(env, buf, len) for (i = 0; i < db_rep->site_cnt; i++) F_CLR(SITE_FROM_EID(i), SITE_TOUCHED); - for (n = 0; p < &buf[len]; ++n) { - ret = __repmgr_site_info_unmarshal(env, - &site_info, p, (size_t)(&buf[len] - p), &p); + for (participants = 0; p < &buf[len]; ) { + if (version < 5) { + ret = __repmgr_v4site_info_unmarshal(env, + &v4site_info, p, (size_t)(&buf[len] - p), &p); + site_info.host = v4site_info.host; + site_info.port = v4site_info.port; + site_info.status = v4site_info.flags; + site_info.flags = 0; + } else + ret = __repmgr_site_info_unmarshal(env, + &site_info, p, (size_t)(&buf[len] - p), &p); DB_ASSERT(env, ret == 0); host = site_info.host.data; @@ -1556,9 +2322,11 @@ __repmgr_refresh_membership(env, buf, len) (u_int8_t*)site_info.host.data + site_info.host.size <= p); host[site_info.host.size-1] = '\0'; port = site_info.port; + if (!FLD_ISSET(site_info.flags, SITE_VIEW)) + participants++; if ((ret = __repmgr_set_membership(env, - host, port, site_info.flags)) != 0) + host, port, site_info.status, site_info.flags)) != 0) goto err; if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0) @@ -1566,8 +2334,13 @@ __repmgr_refresh_membership(env, buf, len) DB_ASSERT(env, IS_VALID_EID(eid)); F_SET(SITE_FROM_EID(eid), SITE_TOUCHED); } - ret = __rep_set_nsites_int(env, n); + ret = __rep_set_nsites_int(env, participants); DB_ASSERT(env, ret == 0); + if (FLD_ISSET(rep->config, + REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT) && + rep->config_nsites > 2) + __db_errx(env, DB_STR("3703", + "More than two sites in preferred master replication group")); /* Scan "touched" flags so as to notice sites that have been removed. */ for (i = 0; i < db_rep->site_cnt; i++) { @@ -1576,7 +2349,8 @@ __repmgr_refresh_membership(env, buf, len) continue; host = site->net_addr.host; port = site->net_addr.port; - if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0) + if ((ret = __repmgr_set_membership(env, host, port, + 0, site->gmdb_flags)) != 0) goto err; } @@ -1597,13 +2371,13 @@ __repmgr_reload_gmdb(env) size_t len; int ret; - ENV_ENTER(env, ip); + ENV_GET_THREAD_INFO(env, ip); if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) { env->rep_handle->have_gmdb = TRUE; - ret = __repmgr_refresh_membership(env, buf, len); + ret = __repmgr_refresh_membership(env, buf, len, + DB_REPMGR_VERSION); __os_free(env, buf); } - ENV_LEAVE(env, ip); return (ret); } @@ -1650,7 +2424,8 @@ __repmgr_init_save(env, dbt) dbt->data = NULL; dbt->size = 0; ret = 0; - } else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) { + } else if ((ret = __repmgr_marshal_member_list(env, + DB_REPMGR_VERSION, &buf, &len)) == 0) { dbt->data = buf; dbt->size = (u_int32_t)len; } @@ -1700,6 +2475,7 @@ __repmgr_defer_op(env, op) */ if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0) return (ret); + msg->size = sizeof(*msg); msg->msg_hdr.type = REPMGR_OWN_MSG; REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op; ret = __repmgr_queue_put(env, msg); @@ -1771,7 +2547,7 @@ __repmgr_become_client(env) if ((ret = __repmgr_await_gmdbop(env)) == 0) db_rep->client_intent = TRUE; UNLOCK_MUTEX(db_rep->mutex); - return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret); + return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT, 0) : ret); } /* @@ -1897,16 +2673,17 @@ get_eid(env, host, port, eidp) * accordingly. * * PUBLIC: int __repmgr_set_membership __P((ENV *, - * PUBLIC: const char *, u_int, u_int32_t)); + * PUBLIC: const char *, u_int, u_int32_t, u_int32_t)); * * Caller must host db_rep mutex, and be in ENV_ENTER context. */ int -__repmgr_set_membership(env, host, port, status) +__repmgr_set_membership(env, host, port, status, flags) ENV *env; const char *host; u_int port; u_int32_t status; + u_int32_t flags; { DB_REP *db_rep; REP *rep; @@ -1953,7 +2730,9 @@ __repmgr_set_membership(env, host, port, status) /* Set both private and shared copies of the info. */ site->membership = status; + site->gmdb_flags = flags; sites[eid].status = status; + sites[eid].flags = flags; } MUTEX_UNLOCK(env, rep->mtx_repmgr); @@ -1965,7 +2744,8 @@ __repmgr_set_membership(env, host, port, status) SELECTOR_RUNNING(db_rep)) { if (eid == db_rep->self_eid && status != SITE_PRESENT) - ret = DB_DELETED; + ret = (status == SITE_ADDING) ? + __repmgr_defer_op(env, REPMGR_REJOIN) : DB_DELETED; else if (orig != SITE_PRESENT && status == SITE_PRESENT && site->state == SITE_IDLE) { /* @@ -1981,10 +2761,11 @@ __repmgr_set_membership(env, host, port, status) * failure shouldn't hurt anything, because we'll just * naturally try again later. */ - ret = __repmgr_schedule_connection_attempt(env, - eid, TRUE); - if (eid != db_rep->self_eid) + if (eid != db_rep->self_eid) { + ret = __repmgr_schedule_connection_attempt(env, + eid, TRUE); DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid); + } } else if (orig != 0 && status == 0) DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid); @@ -2084,3 +2865,73 @@ __repmgr_bcast_own_msg(env, type, buf, len) } return (0); } + +/* + * PUBLIC: int __repmgr_bcast_member_list __P((ENV *)); + * + * Broadcast membership list to all other sites in the replication group. + * + * Caller must hold mutex. + */ +int +__repmgr_bcast_member_list(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + REPMGR_SITE *site; + u_int8_t *buf, *v4buf; + size_t len, v4len; + int ret; + u_int i; + + db_rep = env->rep_handle; + if (!SELECTOR_RUNNING(db_rep)) + return (0); + buf = NULL; + v4buf = NULL; + LOCK_MUTEX(db_rep->mutex); + /* + * Some of the other sites in the replication group might be at + * an older version, so we need to be able to send the membership + * list in the current or older format. + */ + if ((ret = __repmgr_marshal_member_list(env, + DB_REPMGR_VERSION, &buf, &len)) != 0 || + (ret = __repmgr_marshal_member_list(env, + 4, &v4buf, &v4len)) != 0) { + UNLOCK_MUTEX(db_rep->mutex); + goto out; + } + UNLOCK_MUTEX(db_rep->mutex); + + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Broadcast latest membership list")); + FOR_EACH_REMOTE_SITE_INDEX(i) { + site = SITE_FROM_EID(i); + if (site->state != SITE_CONNECTED) + continue; + if ((conn = site->ref.conn.in) != NULL && + conn->state == CONN_READY && + (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING, + (conn->version < 5 ? v4buf : buf), + (conn->version < 5 ? (u_int32_t) v4len : (u_int32_t)len))) + != 0 && + (ret = __repmgr_bust_connection(env, conn)) != 0) + goto out; + if ((conn = site->ref.conn.out) != NULL && + conn->state == CONN_READY && + (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING, + (conn->version < 5 ? v4buf : buf), + (conn->version < 5 ? (u_int32_t)v4len : (u_int32_t)len))) + != 0 && + (ret = __repmgr_bust_connection(env, conn)) != 0) + goto out; + } +out: + if (buf != NULL) + __os_free(env, buf); + if (v4buf != NULL) + __os_free(env, v4buf); + return (ret); +} |
