diff options
author | Ben Pfaff <blp@ovn.org> | 2016-04-15 16:21:59 -0700 |
---|---|---|
committer | Ben Pfaff <blp@ovn.org> | 2016-04-15 16:21:59 -0700 |
commit | a6dfaec4257a2c3c0057e9a02d3f6034bbc756f7 (patch) | |
tree | f21a98992c03bd1b67b692d75fb132bcb56a886e | |
parent | b914715a354db1ad3761556e076ba2607340cef6 (diff) | |
download | openvswitch-a6dfaec4257a2c3c0057e9a02d3f6034bbc756f7.tar.gz |
work on raft_rpc_from_jsonrpc
-rw-r--r-- | lib/raft.c | 321 |
1 files changed, 248 insertions, 73 deletions
diff --git a/lib/raft.c b/lib/raft.c index 122acdb2d..44a4af076 100644 --- a/lib/raft.c +++ b/lib/raft.c @@ -161,32 +161,62 @@ struct raft { int n_votes; /* Number of votes for me. */ }; -enum raft_rpc_type { - /* AppendEntries RPC. */ - RAFT_RPC_APPEND_REQUEST, - RAFT_RPC_APPEND_REPLY, - - /* RequestVote RPC. */ - RAFT_RPC_VOTE_REQUEST, - RAFT_RPC_VOTE_REPLY, +#define RAFT_RPC_TYPES \ + /* AppendEntries RPC. */ \ + RAFT_RPC(RAFT_RPC_APPEND_REQUEST, "append_request") \ + RAFT_RPC(RAFT_RPC_APPEND_REPLY, "append_reply") \ + \ + /* RequestVote RPC. */ \ + RAFT_RPC(RAFT_RPC_VOTE_REQUEST, "vote_request") \ + RAFT_RPC(RAFT_RPC_VOTE_REPLY, "vote_reply") \ + \ + /* AddServer RPC. */ \ + RAFT_RPC(RAFT_RPC_ADD_SERVER_REQUEST, "add_server_request") \ + RAFT_RPC(RAFT_RPC_ADD_SERVER_REPLY, "add_server_reply") \ + \ + /* RemoveServer RPC. */ \ + RAFT_RPC(RAFT_RPC_REMOVE_SERVER_REQUEST, "remove_server_request") \ + RAFT_RPC(RAFT_RPC_REMOVE_SERVER_REPLY, "remove_server_reply") \ + \ + /* InstallSnapshot RPC. */ \ + RAFT_RPC(RAFT_RPC_INSTALL_SNAPSHOT_REQUEST, "install_snapshot_request") \ + RAFT_RPC(RAFT_RPC_INSTALL_SNAPSHOT_REPLY, "install_snapshot_reply") - /* AddServer RPC. */ - RAFT_RPC_ADD_SERVER_REQUEST, - RAFT_RPC_ADD_SERVER_REPLY, +enum raft_rpc_type { +#define RAFT_RPC(ENUM, NAME) ENUM, + RAFT_RPC_TYPES +#undef RAFT_RPC +}; - /* RemoveServer RPC. */ - RAFT_RPC_REMOVE_SERVER_REQUEST, - RAFT_RPC_REMOVE_SERVER_REPLY, +static const char * +raft_rpc_type_to_string(enum raft_rpc_type status) +{ + switch (status) { +#define RAFT_RPC(ENUM, NAME) case ENUM: return NAME; + RAFT_RPC_TYPES +#undef RAFT_RPC + } + return "<unknown>"; +} - /* InstallSnapshot RPC. */ - RAFT_RPC_SNAPSHOT_REQUEST, - RAFT_RPC_SNAPSHOT_REPLY, -}; +static bool +raft_rpc_type_from_string(const char *s, enum raft_rpc_type *status) +{ +#define RAFT_RPC(ENUM, NAME) \ + if (!strcmp(s, NAME)) { \ + *status = ENUM; \ + return true; \ + } + RAFT_RPC_TYPES +#undef RAFT_RPC + return false; +} struct raft_rpc_common { enum raft_rpc_type type; /* One of RAFT_RPC_*. */ struct uuid sid; /* SID of peer server. */ - struct uuid xid; /* To match up requests and replies. */ + struct uuid xid; /* To match up requests and replies + * XXX---is this needed?. */ }; struct raft_append_request { @@ -330,7 +360,7 @@ struct raft_server_reply { struct uuid leader_sid; }; -struct raft_snapshot_request { +struct raft_install_snapshot_request { struct raft_rpc_common common; uint64_t term; /* Leader's term. */ @@ -340,10 +370,19 @@ struct raft_snapshot_request { uint64_t last_term; /* Term of last_index. */ /* Server configuration. */ - struct uuid *servers; + struct { + struct uuid sid; + char *address; + } *servers; size_t n_servers; - /* Data. */ + /* Data. + * + * The data must be a valid UTF-8 string, because it is going to be sent as + * a JSON string. That means that chunks must not be chosen so as to break + * apart multibyte characters (because that would create invalid UTF-8). + * + * The data need not be null-terminated. */ uint64_t offset; const char *data; size_t len; @@ -352,9 +391,10 @@ struct raft_snapshot_request { bool done; }; -struct raft_snapshot_reply { +struct raft_install_snapshot_reply { struct raft_rpc_common common; + /* XXX how do we handle lost fragments? */ uint64_t term; /* For leader to update itself. */ }; @@ -366,8 +406,8 @@ union raft_rpc { struct raft_vote_reply vote_reply; struct raft_server_request server_request; struct raft_server_reply server_reply; - struct raft_snapshot_request snapshot_request; - struct raft_snapshot_reply snapshot_reply; + struct raft_install_snapshot_request install_snapshot_request; + struct raft_install_snapshot_reply install_snapshot_reply; }; static struct raft_server * @@ -485,8 +525,9 @@ raft_create(const char *file_name, const char *local_address, /* Write snapshot record. */ struct json *prev_servers = json_object_create(); - json_object_put(prev_servers, xasprintf(UUID_FMT, UUID_ARGS(&sid)), - json_string_create(local_address)); + char sid_s[UUID_LEN + 1]; + sprintf(sid_s, UUID_FMT, UUID_ARGS(&sid)); + json_object_put(prev_servers, sid_s, json_string_create(local_address)); struct json *snapshot = json_object_create(); json_object_put(snapshot, "prev_term", json_integer_create(0)); json_object_put(snapshot, "prev_index", json_integer_create(0)); @@ -531,7 +572,7 @@ raft_add_entry(struct raft *raft, } static uint64_t -parse_integer(struct ovsdb_parser *p, const char *name) +parse_uint(struct ovsdb_parser *p, const char *name) { const struct json *json = ovsdb_parser_member(p, name, OP_INTEGER); return json ? json_integer(json) : 0; @@ -661,7 +702,7 @@ parse_log_record(struct raft *raft, const struct json *entry) ovsdb_parser_init(&p, entry, "raft log entry"); /* Parse "term". */ - uint64_t term = parse_integer(&p, "term"); + uint64_t term = parse_uint(&p, "term"); if (term < raft->current_term) { ovsdb_parser_raise_error(&p, "log entry term %"PRIu64" precedes " "current term %"PRIu64".", @@ -764,8 +805,8 @@ raft_open(const char *file_name, struct raft **raftp) goto error; } ovsdb_parser_init(&p, snapshot, "raft snapshot"); - raft->prev_term = parse_integer(&p, "prev_term"); - raft->log_start = raft->log_end = parse_integer(&p, "prev_index") + 1; + raft->prev_term = parse_uint(&p, "prev_term"); + raft->log_start = raft->log_end = parse_uint(&p, "prev_index") + 1; const struct json *prev_servers_json = ovsdb_parser_member( &p, "prev_servers", OP_OBJECT); const struct json *data = ovsdb_parser_member( @@ -903,14 +944,10 @@ raft_run(struct raft *raft) jsonrpc_session_run(s->conn); } } + +/* raft_rpc_to_jsonrpc(). */ -static char * -format_u64(uint64_t x) -{ - return xasprintf("%"PRIu64, x); -} - -static const char * +static void raft_append_request_to_jsonrpc(const struct raft_append_request *rq, struct json *args) { @@ -937,11 +974,9 @@ raft_append_request_to_jsonrpc(const struct raft_append_request *rq, entries[i] = dst; } json_object_put(args, "log", json_array_create(entries, rq->n_entries)); - - return "append_request"; } -static const char * +static void raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy, struct json *args) { @@ -951,27 +986,24 @@ raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy, json_object_put_uint(args, "prev_log_term", rpy->prev_log_term); json_object_put_uint(args, "n_entries", rpy->n_entries); json_object_put(args, "success", json_boolean_create(rpy->success)); - return "append_reply"; } -static const char * +static void raft_vote_request_to_jsonrpc(const struct raft_vote_request *rq, struct json *args) { json_object_put_uint(args, "term", rq->term); json_object_put_uint(args, "last_log_index", rq->last_log_index); json_object_put_uint(args, "last_log_term", rq->last_log_term); - return "vote_request"; } -static const char * +static void raft_vote_reply_to_jsonrpc(const struct raft_vote_reply *rpy, struct json *args) { json_object_put_uint(args, "term", rpy->term); json_object_put(args, "vote_granted", json_boolean_create(rpy->vote_granted)); - return "vote_reply"; } static void @@ -997,6 +1029,38 @@ raft_server_reply_to_jsonrpc(const struct raft_server_reply *rpy, } } +static void +raft_install_snapshot_request_to_jsonrpc( + const struct raft_install_snapshot_request *rq, struct json *args) +{ + json_object_put_uint(args, "term", rq->term); + json_object_put_format(args, "leader", + UUID_FMT, UUID_ARGS(&rq->leader_sid)); + json_object_put_uint(args, "last_index", rq->last_index); + json_object_put_uint(args, "last_term", rq->last_term); + + struct json *servers = json_object_create(); + for (size_t i = 0; i < rq->n_servers; i++) { + char sid_s[UUID_LEN + 1]; + sprintf(sid_s, UUID_FMT, UUID_ARGS(&rq->servers[i].sid)); + json_object_put_string(servers, sid_s, rq->servers[i].address); + } + json_object_put(args, "servers", servers); + + json_object_put_uint(args, "offset", rq->offset); + json_object_put(args, "data", + json_string_create_nocopy(xmemdup0(rq->data, rq->len))); + + json_object_put(args, "done", json_boolean_create(rq->done)); +} + +static void +raft_install_snapshot_reply_to_jsonrpc( + const struct raft_install_snapshot_reply *rpy, struct json *args) +{ + json_object_put_uint(args, "term", rpy->term); +} + static struct jsonrpc_msg * raft_rpc_to_jsonrpc(const struct raft *raft, const union raft_rpc *rpc) @@ -1006,51 +1070,161 @@ raft_rpc_to_jsonrpc(const struct raft *raft, json_object_put_format(args, "from", UUID_FMT, UUID_ARGS(&raft->sid)); json_object_put_format(args, "to", UUID_FMT, UUID_ARGS(&rpc->common.sid)); - const char *method; switch (rpc->common.type) { case RAFT_RPC_APPEND_REQUEST: - method = raft_append_request_to_jsonrpc(&rpc->append_request, + raft_append_request_to_jsonrpc(&rpc->append_request, args); break; case RAFT_RPC_APPEND_REPLY: - method = raft_append_reply_to_jsonrpc(&rpc->append_reply, args); + raft_append_reply_to_jsonrpc(&rpc->append_reply, args); break; case RAFT_RPC_VOTE_REQUEST: - method = raft_vote_request_to_jsonrpc(&rpc->vote_request, args); + raft_vote_request_to_jsonrpc(&rpc->vote_request, args); break; case RAFT_RPC_VOTE_REPLY: - method = raft_vote_reply_to_jsonrpc(&rpc->vote_reply, args); + raft_vote_reply_to_jsonrpc(&rpc->vote_reply, args); break; case RAFT_RPC_ADD_SERVER_REQUEST: raft_server_request_to_jsonrpc(&rpc->server_request, args); - method = "add_server_request"; break; case RAFT_RPC_ADD_SERVER_REPLY: raft_server_reply_to_jsonrpc(&rpc->server_reply, args); - method = "add_server_reply"; break; case RAFT_RPC_REMOVE_SERVER_REQUEST: raft_server_request_to_jsonrpc(&rpc->server_request, args); - method = "remove_server_request"; break; case RAFT_RPC_REMOVE_SERVER_REPLY: raft_server_reply_to_jsonrpc(&rpc->server_reply, args); - method = "remove_server_reply"; break; - case RAFT_RPC_SNAPSHOT_REQUEST: - method = raft_snapshot_request_to_jsonrpc(&rpc->snapshot_request, - args); + case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST: + raft_install_snapshot_request_to_jsonrpc( + &rpc->install_snapshot_request, args); break; - case RAFT_RPC_SNAPSHOT_REPLY: - method = raft_snapshot_reply_to_jsonrpc(&rpc->snapshot_reply, args); + case RAFT_RPC_INSTALL_SNAPSHOT_REPLY: + raft_install_snapshot_reply_to_jsonrpc( + &rpc->install_snapshot_reply, args); break; default: OVS_NOT_REACHED(); } - return jsonrpc_create_notify(method, json_array_create_1(args)); + return jsonrpc_create_notify(raft_rpc_type_to_string(rpc->common.type), + json_array_create_1(args)); } + +/* raft_rpc_from_jsonrpc(). */ +static struct vlog_rate_limit rpc_rl = VLOG_RATE_LIMIT_INIT(5, 5); + +static void +raft_append_request_from_jsonrpc(struct ovsdb_parser *p, + struct raft_append_request *rq) +{ + rq->term = parse_uint(p, "term"); + parse_optional_uuid(p, "leader", &rq->leader_sid); + rq->prev_log_index = parse_uint(p, "prev_log_index"); + rq->prev_log_term = parse_uint(p, "prev_log_term"); + rq->leader_commit = parse_uint(p, "leader_commit"); + + struct json *log = ovsdb_parser_member(p, "log", OP_ARRAY); + if (!log) { + return; + } + struct json_array *entries = json_array(log); + rq->entries = xmalloc(entries->n * sizeof *rq->entries); + rq->n_entries = 0; + for (size_t i = 0; i < entries->n; i++) { + char *error = raft_entry_parse(entries[i], &rq->entries[i]); + if (error) { + ovsdb_parser_raise_error(p, "%s", error); + free(error); + break; + } + rq->n_entries++; + } +} + +static bool +raft_rpc_from_jsonrpc(const struct raft *raft, + const struct jsonrpc_msg *msg, union raft_rpc *rpc) +{ + memset(rpc, 0, sizeof *rpc); + if (msg->type != JSONRPC_NOTIFY) { + VLOG_WARN_RL(&rpc_rl, "expecting notify RPC but received %s", + jsonrpc_msg_type_to_string(msg->type)); + return false; + } + + if (!raft_rpc_type_from_string(msg->method, &rpc->type)) { + VLOG_WARN_RL(&rpc_rl, "unknown method %s", msg->method); + return false; + } + + if (json_array(msg->params)->n != 1) { + VLOG_WARN_RL(&rpc_rl, "%s RPC has %"PRIuSIZE" parameters (expected 1)", + json_array(msg->params)->n); + return false; + } + + struct ovsdb_parser p; + ovsdb_parser_init(&p, json_array(msg->params)->elems[0]); + + struct uuid cid = parse_required_uuid(&p, "cluster"); + if (cid != raft->cid) { + ovsdb_parser_raise_error(&p, "wrong cluster "UUID_FMT" " + "(expected "UUID_FMT")", + UUID_ARGS(&cid), UUID_ARGS(&raft->cid)); + } + + struct uuid to_sid = parse_required_uuid(&p, "to"); + if (sid != raft->sid) { + ovsdb_parser_raise_error(&p, "misrouted message (addressed to " + UUID_FMT" but we're "UUID_FMT")", + UUID_ARGS(&to_sid), UUID_ARGS(&raft->sid)); + } + + rpc->common.sid = parse_required_uuid(&p, "from"); + + switch (rpc->type) { + case RAFT_RPC_APPEND_REQUEST: + raft_append_request_from_jsonrpc(&p, &rpc->append_request); + break; + case RAFT_RPC_APPEND_REPLY: + raft_append_reply_from_jsonrpc(&p, &rpc->append_reply); + break; + case RAFT_RPC_VOTE_REQUEST: + raft_vote_request_from_jsonrpc(&p, &rpc->vote_request); + break; + case RAFT_RPC_VOTE_REPLY: + raft_vote_reply_from_jsonrpc(&p, &rpc->vote_reply); + break; + case RAFT_RPC_ADD_SERVER_REQUEST: + raft_server_request_from_jsonrpc(&p, &rpc->server_request); + break; + case RAFT_RPC_ADD_SERVER_REPLY: + raft_server_reply_from_jsonrpc(&p, &rpc->server_reply); + break; + case RAFT_RPC_REMOVE_SERVER_REQUEST: + raft_server_request_from_jsonrpc(&p, &rpc->server_request); + break; + case RAFT_RPC_REMOVE_SERVER_REPLY: + raft_server_reply_from_jsonrpc(&p, &rpc->server_reply); + break; + case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST: + raft_install_snapshot_request_from_jsonrpc(&p, + &rpc->install_snapshot_request); + break; + case RAFT_RPC_INSTALL_SNAPSHOT_REPLY: + raft_install_snapshot_reply_from_jsonrpc(&p, + &rpc->install_snapshot_reply); + break; + default: + OVS_NOT_REACHED(); + } + + +} + #if 0 static void raft_send_server_reply(struct raft *raft, @@ -1847,8 +2021,8 @@ raft_handle_remove_server_reply(struct raft *raft OVS_UNUSED, } static void -raft_handle_snapshot_request__(struct raft *raft, - const struct raft_snapshot_request *rq) +raft_handle_install_snapshot_request__( + struct raft *raft, const struct raft_install_snapshot_request *rq) { if (!raft_receive_term__(raft, rq->term)) { return; @@ -1858,13 +2032,13 @@ raft_handle_snapshot_request__(struct raft *raft, } static void -raft_handle_snapshot_request(struct raft *raft, - const struct raft_snapshot_request *rq) +raft_handle_install_snapshot_request( + struct raft *raft, const struct raft_install_snapshot_request *rq) { - raft_handle_snapshot_request__(raft, rq); + raft_handle_install_snapshot_request__(raft, rq); union raft_rpc rpy = { - .snapshot_reply = { + .install_snapshot_reply = { .common = { .type = RAFT_RPC_SNAPSHOT_REPLY, .sid = rq->common.sid, @@ -1877,8 +2051,8 @@ raft_handle_snapshot_request(struct raft *raft, } static void -raft_handle_snapshot_reply(struct raft *raft, - const struct raft_snapshot_reply *rpy) +raft_handle_install_snapshot_reply( + struct raft *raft, const struct raft_install_snapshot_reply *rpy) { if (!raft_receive_term__(raft, rpy->term)) { return; @@ -1914,11 +2088,12 @@ raft_receive(struct raft *raft, const union raft_rpc *rpc) case RAFT_RPC_REMOVE_SERVER_REPLY: raft_handle_remove_server_reply(raft, &rpc->server_reply); break; - case RAFT_RPC_SNAPSHOT_REQUEST: - raft_handle_snapshot_request(raft, &rpc->snapshot_request); + case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST: + raft_handle_install_snapshot_request(raft, + &rpc->install_snapshot_request); break; - case RAFT_RPC_SNAPSHOT_REPLY: - raft_handle_snapshot_reply(raft, &rpc->snapshot_reply); + case RAFT_RPC_INSTALL_SNAPSHOT_REPLY: + raft_handle_install_snapshot_reply(raft, &rpc->install_snapshot_reply); break; default: OVS_NOT_REACHED(); |