summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Pfaff <blp@ovn.org>2016-04-15 16:21:59 -0700
committerBen Pfaff <blp@ovn.org>2016-04-15 16:21:59 -0700
commita6dfaec4257a2c3c0057e9a02d3f6034bbc756f7 (patch)
treef21a98992c03bd1b67b692d75fb132bcb56a886e
parentb914715a354db1ad3761556e076ba2607340cef6 (diff)
downloadopenvswitch-a6dfaec4257a2c3c0057e9a02d3f6034bbc756f7.tar.gz
work on raft_rpc_from_jsonrpc
-rw-r--r--lib/raft.c321
1 files changed, 248 insertions, 73 deletions
diff --git a/lib/raft.c b/lib/raft.c
index 122acdb2d..44a4af076 100644
--- a/lib/raft.c
+++ b/lib/raft.c
@@ -161,32 +161,62 @@ struct raft {
int n_votes; /* Number of votes for me. */
};
-enum raft_rpc_type {
- /* AppendEntries RPC. */
- RAFT_RPC_APPEND_REQUEST,
- RAFT_RPC_APPEND_REPLY,
-
- /* RequestVote RPC. */
- RAFT_RPC_VOTE_REQUEST,
- RAFT_RPC_VOTE_REPLY,
+#define RAFT_RPC_TYPES \
+ /* AppendEntries RPC. */ \
+ RAFT_RPC(RAFT_RPC_APPEND_REQUEST, "append_request") \
+ RAFT_RPC(RAFT_RPC_APPEND_REPLY, "append_reply") \
+ \
+ /* RequestVote RPC. */ \
+ RAFT_RPC(RAFT_RPC_VOTE_REQUEST, "vote_request") \
+ RAFT_RPC(RAFT_RPC_VOTE_REPLY, "vote_reply") \
+ \
+ /* AddServer RPC. */ \
+ RAFT_RPC(RAFT_RPC_ADD_SERVER_REQUEST, "add_server_request") \
+ RAFT_RPC(RAFT_RPC_ADD_SERVER_REPLY, "add_server_reply") \
+ \
+ /* RemoveServer RPC. */ \
+ RAFT_RPC(RAFT_RPC_REMOVE_SERVER_REQUEST, "remove_server_request") \
+ RAFT_RPC(RAFT_RPC_REMOVE_SERVER_REPLY, "remove_server_reply") \
+ \
+ /* InstallSnapshot RPC. */ \
+ RAFT_RPC(RAFT_RPC_INSTALL_SNAPSHOT_REQUEST, "install_snapshot_request") \
+ RAFT_RPC(RAFT_RPC_INSTALL_SNAPSHOT_REPLY, "install_snapshot_reply")
- /* AddServer RPC. */
- RAFT_RPC_ADD_SERVER_REQUEST,
- RAFT_RPC_ADD_SERVER_REPLY,
+enum raft_rpc_type {
+#define RAFT_RPC(ENUM, NAME) ENUM,
+ RAFT_RPC_TYPES
+#undef RAFT_RPC
+};
- /* RemoveServer RPC. */
- RAFT_RPC_REMOVE_SERVER_REQUEST,
- RAFT_RPC_REMOVE_SERVER_REPLY,
+static const char *
+raft_rpc_type_to_string(enum raft_rpc_type status)
+{
+ switch (status) {
+#define RAFT_RPC(ENUM, NAME) case ENUM: return NAME;
+ RAFT_RPC_TYPES
+#undef RAFT_RPC
+ }
+ return "<unknown>";
+}
- /* InstallSnapshot RPC. */
- RAFT_RPC_SNAPSHOT_REQUEST,
- RAFT_RPC_SNAPSHOT_REPLY,
-};
+static bool
+raft_rpc_type_from_string(const char *s, enum raft_rpc_type *status)
+{
+#define RAFT_RPC(ENUM, NAME) \
+ if (!strcmp(s, NAME)) { \
+ *status = ENUM; \
+ return true; \
+ }
+ RAFT_RPC_TYPES
+#undef RAFT_RPC
+ return false;
+}
struct raft_rpc_common {
enum raft_rpc_type type; /* One of RAFT_RPC_*. */
struct uuid sid; /* SID of peer server. */
- struct uuid xid; /* To match up requests and replies. */
+ struct uuid xid; /* To match up requests and replies
+ * XXX---is this needed?. */
};
struct raft_append_request {
@@ -330,7 +360,7 @@ struct raft_server_reply {
struct uuid leader_sid;
};
-struct raft_snapshot_request {
+struct raft_install_snapshot_request {
struct raft_rpc_common common;
uint64_t term; /* Leader's term. */
@@ -340,10 +370,19 @@ struct raft_snapshot_request {
uint64_t last_term; /* Term of last_index. */
/* Server configuration. */
- struct uuid *servers;
+ struct {
+ struct uuid sid;
+ char *address;
+ } *servers;
size_t n_servers;
- /* Data. */
+ /* Data.
+ *
+ * The data must be a valid UTF-8 string, because it is going to be sent as
+ * a JSON string. That means that chunks must not be chosen so as to break
+ * apart multibyte characters (because that would create invalid UTF-8).
+ *
+ * The data need not be null-terminated. */
uint64_t offset;
const char *data;
size_t len;
@@ -352,9 +391,10 @@ struct raft_snapshot_request {
bool done;
};
-struct raft_snapshot_reply {
+struct raft_install_snapshot_reply {
struct raft_rpc_common common;
+ /* XXX how do we handle lost fragments? */
uint64_t term; /* For leader to update itself. */
};
@@ -366,8 +406,8 @@ union raft_rpc {
struct raft_vote_reply vote_reply;
struct raft_server_request server_request;
struct raft_server_reply server_reply;
- struct raft_snapshot_request snapshot_request;
- struct raft_snapshot_reply snapshot_reply;
+ struct raft_install_snapshot_request install_snapshot_request;
+ struct raft_install_snapshot_reply install_snapshot_reply;
};
static struct raft_server *
@@ -485,8 +525,9 @@ raft_create(const char *file_name, const char *local_address,
/* Write snapshot record. */
struct json *prev_servers = json_object_create();
- json_object_put(prev_servers, xasprintf(UUID_FMT, UUID_ARGS(&sid)),
- json_string_create(local_address));
+ char sid_s[UUID_LEN + 1];
+ sprintf(sid_s, UUID_FMT, UUID_ARGS(&sid));
+ json_object_put(prev_servers, sid_s, json_string_create(local_address));
struct json *snapshot = json_object_create();
json_object_put(snapshot, "prev_term", json_integer_create(0));
json_object_put(snapshot, "prev_index", json_integer_create(0));
@@ -531,7 +572,7 @@ raft_add_entry(struct raft *raft,
}
static uint64_t
-parse_integer(struct ovsdb_parser *p, const char *name)
+parse_uint(struct ovsdb_parser *p, const char *name)
{
const struct json *json = ovsdb_parser_member(p, name, OP_INTEGER);
return json ? json_integer(json) : 0;
@@ -661,7 +702,7 @@ parse_log_record(struct raft *raft, const struct json *entry)
ovsdb_parser_init(&p, entry, "raft log entry");
/* Parse "term". */
- uint64_t term = parse_integer(&p, "term");
+ uint64_t term = parse_uint(&p, "term");
if (term < raft->current_term) {
ovsdb_parser_raise_error(&p, "log entry term %"PRIu64" precedes "
"current term %"PRIu64".",
@@ -764,8 +805,8 @@ raft_open(const char *file_name, struct raft **raftp)
goto error;
}
ovsdb_parser_init(&p, snapshot, "raft snapshot");
- raft->prev_term = parse_integer(&p, "prev_term");
- raft->log_start = raft->log_end = parse_integer(&p, "prev_index") + 1;
+ raft->prev_term = parse_uint(&p, "prev_term");
+ raft->log_start = raft->log_end = parse_uint(&p, "prev_index") + 1;
const struct json *prev_servers_json = ovsdb_parser_member(
&p, "prev_servers", OP_OBJECT);
const struct json *data = ovsdb_parser_member(
@@ -903,14 +944,10 @@ raft_run(struct raft *raft)
jsonrpc_session_run(s->conn);
}
}
+
+/* raft_rpc_to_jsonrpc(). */
-static char *
-format_u64(uint64_t x)
-{
- return xasprintf("%"PRIu64, x);
-}
-
-static const char *
+static void
raft_append_request_to_jsonrpc(const struct raft_append_request *rq,
struct json *args)
{
@@ -937,11 +974,9 @@ raft_append_request_to_jsonrpc(const struct raft_append_request *rq,
entries[i] = dst;
}
json_object_put(args, "log", json_array_create(entries, rq->n_entries));
-
- return "append_request";
}
-static const char *
+static void
raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy,
struct json *args)
{
@@ -951,27 +986,24 @@ raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy,
json_object_put_uint(args, "prev_log_term", rpy->prev_log_term);
json_object_put_uint(args, "n_entries", rpy->n_entries);
json_object_put(args, "success", json_boolean_create(rpy->success));
- return "append_reply";
}
-static const char *
+static void
raft_vote_request_to_jsonrpc(const struct raft_vote_request *rq,
struct json *args)
{
json_object_put_uint(args, "term", rq->term);
json_object_put_uint(args, "last_log_index", rq->last_log_index);
json_object_put_uint(args, "last_log_term", rq->last_log_term);
- return "vote_request";
}
-static const char *
+static void
raft_vote_reply_to_jsonrpc(const struct raft_vote_reply *rpy,
struct json *args)
{
json_object_put_uint(args, "term", rpy->term);
json_object_put(args, "vote_granted",
json_boolean_create(rpy->vote_granted));
- return "vote_reply";
}
static void
@@ -997,6 +1029,38 @@ raft_server_reply_to_jsonrpc(const struct raft_server_reply *rpy,
}
}
+static void
+raft_install_snapshot_request_to_jsonrpc(
+ const struct raft_install_snapshot_request *rq, struct json *args)
+{
+ json_object_put_uint(args, "term", rq->term);
+ json_object_put_format(args, "leader",
+ UUID_FMT, UUID_ARGS(&rq->leader_sid));
+ json_object_put_uint(args, "last_index", rq->last_index);
+ json_object_put_uint(args, "last_term", rq->last_term);
+
+ struct json *servers = json_object_create();
+ for (size_t i = 0; i < rq->n_servers; i++) {
+ char sid_s[UUID_LEN + 1];
+ sprintf(sid_s, UUID_FMT, UUID_ARGS(&rq->servers[i].sid));
+ json_object_put_string(servers, sid_s, rq->servers[i].address);
+ }
+ json_object_put(args, "servers", servers);
+
+ json_object_put_uint(args, "offset", rq->offset);
+ json_object_put(args, "data",
+ json_string_create_nocopy(xmemdup0(rq->data, rq->len)));
+
+ json_object_put(args, "done", json_boolean_create(rq->done));
+}
+
+static void
+raft_install_snapshot_reply_to_jsonrpc(
+ const struct raft_install_snapshot_reply *rpy, struct json *args)
+{
+ json_object_put_uint(args, "term", rpy->term);
+}
+
static struct jsonrpc_msg *
raft_rpc_to_jsonrpc(const struct raft *raft,
const union raft_rpc *rpc)
@@ -1006,51 +1070,161 @@ raft_rpc_to_jsonrpc(const struct raft *raft,
json_object_put_format(args, "from", UUID_FMT, UUID_ARGS(&raft->sid));
json_object_put_format(args, "to", UUID_FMT, UUID_ARGS(&rpc->common.sid));
- const char *method;
switch (rpc->common.type) {
case RAFT_RPC_APPEND_REQUEST:
- method = raft_append_request_to_jsonrpc(&rpc->append_request,
+ raft_append_request_to_jsonrpc(&rpc->append_request,
args);
break;
case RAFT_RPC_APPEND_REPLY:
- method = raft_append_reply_to_jsonrpc(&rpc->append_reply, args);
+ raft_append_reply_to_jsonrpc(&rpc->append_reply, args);
break;
case RAFT_RPC_VOTE_REQUEST:
- method = raft_vote_request_to_jsonrpc(&rpc->vote_request, args);
+ raft_vote_request_to_jsonrpc(&rpc->vote_request, args);
break;
case RAFT_RPC_VOTE_REPLY:
- method = raft_vote_reply_to_jsonrpc(&rpc->vote_reply, args);
+ raft_vote_reply_to_jsonrpc(&rpc->vote_reply, args);
break;
case RAFT_RPC_ADD_SERVER_REQUEST:
raft_server_request_to_jsonrpc(&rpc->server_request, args);
- method = "add_server_request";
break;
case RAFT_RPC_ADD_SERVER_REPLY:
raft_server_reply_to_jsonrpc(&rpc->server_reply, args);
- method = "add_server_reply";
break;
case RAFT_RPC_REMOVE_SERVER_REQUEST:
raft_server_request_to_jsonrpc(&rpc->server_request, args);
- method = "remove_server_request";
break;
case RAFT_RPC_REMOVE_SERVER_REPLY:
raft_server_reply_to_jsonrpc(&rpc->server_reply, args);
- method = "remove_server_reply";
break;
- case RAFT_RPC_SNAPSHOT_REQUEST:
- method = raft_snapshot_request_to_jsonrpc(&rpc->snapshot_request,
- args);
+ case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
+ raft_install_snapshot_request_to_jsonrpc(
+ &rpc->install_snapshot_request, args);
break;
- case RAFT_RPC_SNAPSHOT_REPLY:
- method = raft_snapshot_reply_to_jsonrpc(&rpc->snapshot_reply, args);
+ case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
+ raft_install_snapshot_reply_to_jsonrpc(
+ &rpc->install_snapshot_reply, args);
break;
default:
OVS_NOT_REACHED();
}
- return jsonrpc_create_notify(method, json_array_create_1(args));
+ return jsonrpc_create_notify(raft_rpc_type_to_string(rpc->common.type),
+ json_array_create_1(args));
}
+
+/* raft_rpc_from_jsonrpc(). */
+static struct vlog_rate_limit rpc_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+
+static void
+raft_append_request_from_jsonrpc(struct ovsdb_parser *p,
+ struct raft_append_request *rq)
+{
+ rq->term = parse_uint(p, "term");
+ parse_optional_uuid(p, "leader", &rq->leader_sid);
+ rq->prev_log_index = parse_uint(p, "prev_log_index");
+ rq->prev_log_term = parse_uint(p, "prev_log_term");
+ rq->leader_commit = parse_uint(p, "leader_commit");
+
+ struct json *log = ovsdb_parser_member(p, "log", OP_ARRAY);
+ if (!log) {
+ return;
+ }
+ struct json_array *entries = json_array(log);
+ rq->entries = xmalloc(entries->n * sizeof *rq->entries);
+ rq->n_entries = 0;
+ for (size_t i = 0; i < entries->n; i++) {
+ char *error = raft_entry_parse(entries[i], &rq->entries[i]);
+ if (error) {
+ ovsdb_parser_raise_error(p, "%s", error);
+ free(error);
+ break;
+ }
+ rq->n_entries++;
+ }
+}
+
+static bool
+raft_rpc_from_jsonrpc(const struct raft *raft,
+ const struct jsonrpc_msg *msg, union raft_rpc *rpc)
+{
+ memset(rpc, 0, sizeof *rpc);
+ if (msg->type != JSONRPC_NOTIFY) {
+ VLOG_WARN_RL(&rpc_rl, "expecting notify RPC but received %s",
+ jsonrpc_msg_type_to_string(msg->type));
+ return false;
+ }
+
+ if (!raft_rpc_type_from_string(msg->method, &rpc->type)) {
+ VLOG_WARN_RL(&rpc_rl, "unknown method %s", msg->method);
+ return false;
+ }
+
+ if (json_array(msg->params)->n != 1) {
+ VLOG_WARN_RL(&rpc_rl, "%s RPC has %"PRIuSIZE" parameters (expected 1)",
+ json_array(msg->params)->n);
+ return false;
+ }
+
+ struct ovsdb_parser p;
+ ovsdb_parser_init(&p, json_array(msg->params)->elems[0]);
+
+ struct uuid cid = parse_required_uuid(&p, "cluster");
+ if (cid != raft->cid) {
+ ovsdb_parser_raise_error(&p, "wrong cluster "UUID_FMT" "
+ "(expected "UUID_FMT")",
+ UUID_ARGS(&cid), UUID_ARGS(&raft->cid));
+ }
+
+ struct uuid to_sid = parse_required_uuid(&p, "to");
+ if (sid != raft->sid) {
+ ovsdb_parser_raise_error(&p, "misrouted message (addressed to "
+ UUID_FMT" but we're "UUID_FMT")",
+ UUID_ARGS(&to_sid), UUID_ARGS(&raft->sid));
+ }
+
+ rpc->common.sid = parse_required_uuid(&p, "from");
+
+ switch (rpc->type) {
+ case RAFT_RPC_APPEND_REQUEST:
+ raft_append_request_from_jsonrpc(&p, &rpc->append_request);
+ break;
+ case RAFT_RPC_APPEND_REPLY:
+ raft_append_reply_from_jsonrpc(&p, &rpc->append_reply);
+ break;
+ case RAFT_RPC_VOTE_REQUEST:
+ raft_vote_request_from_jsonrpc(&p, &rpc->vote_request);
+ break;
+ case RAFT_RPC_VOTE_REPLY:
+ raft_vote_reply_from_jsonrpc(&p, &rpc->vote_reply);
+ break;
+ case RAFT_RPC_ADD_SERVER_REQUEST:
+ raft_server_request_from_jsonrpc(&p, &rpc->server_request);
+ break;
+ case RAFT_RPC_ADD_SERVER_REPLY:
+ raft_server_reply_from_jsonrpc(&p, &rpc->server_reply);
+ break;
+ case RAFT_RPC_REMOVE_SERVER_REQUEST:
+ raft_server_request_from_jsonrpc(&p, &rpc->server_request);
+ break;
+ case RAFT_RPC_REMOVE_SERVER_REPLY:
+ raft_server_reply_from_jsonrpc(&p, &rpc->server_reply);
+ break;
+ case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
+ raft_install_snapshot_request_from_jsonrpc(&p,
+ &rpc->install_snapshot_request);
+ break;
+ case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
+ raft_install_snapshot_reply_from_jsonrpc(&p,
+ &rpc->install_snapshot_reply);
+ break;
+ default:
+ OVS_NOT_REACHED();
+ }
+
+
+}
+
#if 0
static void
raft_send_server_reply(struct raft *raft,
@@ -1847,8 +2021,8 @@ raft_handle_remove_server_reply(struct raft *raft OVS_UNUSED,
}
static void
-raft_handle_snapshot_request__(struct raft *raft,
- const struct raft_snapshot_request *rq)
+raft_handle_install_snapshot_request__(
+ struct raft *raft, const struct raft_install_snapshot_request *rq)
{
if (!raft_receive_term__(raft, rq->term)) {
return;
@@ -1858,13 +2032,13 @@ raft_handle_snapshot_request__(struct raft *raft,
}
static void
-raft_handle_snapshot_request(struct raft *raft,
- const struct raft_snapshot_request *rq)
+raft_handle_install_snapshot_request(
+ struct raft *raft, const struct raft_install_snapshot_request *rq)
{
- raft_handle_snapshot_request__(raft, rq);
+ raft_handle_install_snapshot_request__(raft, rq);
union raft_rpc rpy = {
- .snapshot_reply = {
+ .install_snapshot_reply = {
.common = {
.type = RAFT_RPC_SNAPSHOT_REPLY,
.sid = rq->common.sid,
@@ -1877,8 +2051,8 @@ raft_handle_snapshot_request(struct raft *raft,
}
static void
-raft_handle_snapshot_reply(struct raft *raft,
- const struct raft_snapshot_reply *rpy)
+raft_handle_install_snapshot_reply(
+ struct raft *raft, const struct raft_install_snapshot_reply *rpy)
{
if (!raft_receive_term__(raft, rpy->term)) {
return;
@@ -1914,11 +2088,12 @@ raft_receive(struct raft *raft, const union raft_rpc *rpc)
case RAFT_RPC_REMOVE_SERVER_REPLY:
raft_handle_remove_server_reply(raft, &rpc->server_reply);
break;
- case RAFT_RPC_SNAPSHOT_REQUEST:
- raft_handle_snapshot_request(raft, &rpc->snapshot_request);
+ case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
+ raft_handle_install_snapshot_request(raft,
+ &rpc->install_snapshot_request);
break;
- case RAFT_RPC_SNAPSHOT_REPLY:
- raft_handle_snapshot_reply(raft, &rpc->snapshot_reply);
+ case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
+ raft_handle_install_snapshot_reply(raft, &rpc->install_snapshot_reply);
break;
default:
OVS_NOT_REACHED();