From ada9e95c1bda5995d87b0e2830bcf1a935bcfc4a Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Wed, 8 Nov 2017 19:31:05 +1100 Subject: ctdb-common: Add special monitor handling to run_event abstraction Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke --- ctdb/common/run_event.c | 145 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 135 insertions(+), 10 deletions(-) (limited to 'ctdb') diff --git a/ctdb/common/run_event.c b/ctdb/common/run_event.c index 0961d657a74..e230b12f151 100644 --- a/ctdb/common/run_event.c +++ b/ctdb/common/run_event.c @@ -271,6 +271,10 @@ struct run_event_context { const char *script_dir; const char *debug_prog; bool debug_running; + + struct tevent_queue *queue; + struct tevent_req *current_req; + bool monitor_running; }; @@ -321,6 +325,14 @@ int run_event_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev, run_ctx->debug_running = false; + run_ctx->queue = tevent_queue_create(run_ctx, "run event queue"); + if (run_ctx->queue == NULL) { + talloc_free(run_ctx); + return ENOMEM; + } + + run_ctx->monitor_running = false; + *out = run_ctx; return 0; } @@ -341,6 +353,32 @@ static const char *run_event_debug_prog(struct run_event_context *run_ctx) return run_ctx->debug_prog; } +static struct tevent_queue *run_event_queue(struct run_event_context *run_ctx) +{ + return run_ctx->queue; +} + +static void run_event_start_running(struct run_event_context *run_ctx, + struct tevent_req *req, bool is_monitor) +{ + run_ctx->current_req = req; + run_ctx->monitor_running = is_monitor; +} + +static void run_event_stop_running(struct run_event_context *run_ctx) +{ + run_ctx->current_req = NULL; + run_ctx->monitor_running = false; +} + +static struct tevent_req *run_event_get_running( + struct run_event_context *run_ctx, + bool *is_monitor) +{ + *is_monitor = run_ctx->monitor_running; + return run_ctx->current_req; +} + static int run_event_script_status(struct run_event_script *script) { int ret; @@ -583,14 +621,18 @@ struct run_event_state { struct tevent_context *ev; struct run_event_context *run_ctx; const char *event_str; + const char *arg_str; struct timeval timeout; struct run_event_script_list *script_list; const char **argv; + struct tevent_req *script_subreq; int index; - int status; + bool cancelled; }; +static void run_event_cancel(struct tevent_req *req); +static void run_event_trigger(struct tevent_req *req, void *private_data); static struct tevent_req *run_event_run_script(struct tevent_req *req); static void run_event_next_script(struct tevent_req *subreq); static void run_event_debug(struct tevent_req *req, pid_t pid); @@ -603,9 +645,9 @@ struct tevent_req *run_event_send(TALLOC_CTX *mem_ctx, const char *arg_str, struct timeval timeout) { - struct tevent_req *req, *subreq; + struct tevent_req *req, *current_req; struct run_event_state *state; - int ret; + bool monitor_running, status; req = tevent_req_create(mem_ctx, &state, struct run_event_state); if (req == NULL) { @@ -618,39 +660,113 @@ struct tevent_req *run_event_send(TALLOC_CTX *mem_ctx, if (tevent_req_nomem(state->event_str, req)) { return tevent_req_post(req, ev); } + if (arg_str != NULL) { + state->arg_str = talloc_strdup(state, arg_str); + if (tevent_req_nomem(state->arg_str, req)) { + return tevent_req_post(req, ev); + } + } state->timeout = timeout; + state->cancelled = false; + + /* + * If monitor event is running, + * cancel the running monitor event and run new event + * + * If any other event is running, + * if new event is monitor, cancel that event + * else add new event to the queue + */ + + current_req = run_event_get_running(run_ctx, &monitor_running); + if (current_req != NULL) { + if (monitor_running) { + run_event_cancel(current_req); + } else if (strcmp(event_str, "monitor") == 0) { + state->script_list = talloc_zero( + state, struct run_event_script_list); + if (tevent_req_nomem(state->script_list, req)) { + return tevent_req_post(req, ev); + } + state->script_list->summary = -ECANCELED; + tevent_req_done(req); + return tevent_req_post(req, ev); + } + } + + status = tevent_queue_add(run_event_queue(run_ctx), ev, req, + run_event_trigger, NULL); + if (! status) { + tevent_req_error(req, ENOMEM); + return tevent_req_post(req, ev); + } + + return req; +} - ret = get_script_list(state, run_event_script_dir(run_ctx), +static void run_event_cancel(struct tevent_req *req) +{ + struct run_event_state *state = tevent_req_data( + req, struct run_event_state); + + run_event_stop_running(state->run_ctx); + + state->script_list->summary = -ECANCELED; + state->cancelled = true; + + TALLOC_FREE(state->script_subreq); + + tevent_req_done(req); +} + +static void run_event_trigger(struct tevent_req *req, void *private_data) +{ + struct tevent_req *subreq; + struct run_event_state *state = tevent_req_data( + req, struct run_event_state); + int ret; + bool is_monitor = false; + + D_DEBUG("Running event %s with args \"%s\"\n", state->event_str, + state->arg_str == NULL ? "(null)" : state->arg_str); + + ret = get_script_list(state, run_event_script_dir(state->run_ctx), &state->script_list); if (ret != 0) { D_ERR("get_script_list() failed, ret=%d\n", ret); tevent_req_error(req, ret); - return tevent_req_post(req, ev); + return; } /* No scripts */ if (state->script_list == NULL || state->script_list->num_scripts == 0) { tevent_req_done(req); - return tevent_req_post(req, ev); + return; } - ret = script_args(state, event_str, arg_str, &state->argv); + ret = script_args(state, state->event_str, state->arg_str, + &state->argv); if (ret != 0) { D_ERR("script_args() failed, ret=%d\n", ret); tevent_req_error(req, ret); - return tevent_req_post(req, ev); + return; } state->index = 0; subreq = run_event_run_script(req); if (tevent_req_nomem(subreq, req)) { - return tevent_req_post(req, ev); + return; } tevent_req_set_callback(subreq, run_event_next_script, req); - return req; + state->script_subreq = subreq; + + if (strcmp(state->event_str, "monitor") == 0) { + is_monitor = true; + } + run_event_start_running(state->run_ctx, req, is_monitor); } static struct tevent_req *run_event_run_script(struct tevent_req *req) @@ -702,12 +818,17 @@ static void run_event_next_script(struct tevent_req *subreq) status = run_proc_recv(subreq, &ret, &script->result, &pid, state->script_list, &script->output); TALLOC_FREE(subreq); + state->script_subreq = NULL; if (! status) { D_ERR("run_proc failed for %s, ret=%d\n", script->name, ret); tevent_req_error(req, ret); return; } + if (state->cancelled) { + return; + } + /* Log output */ if (script->output != NULL) { debug_log(DEBUG_ERR, script->output, script->name); @@ -731,6 +852,7 @@ static void run_event_next_script(struct tevent_req *subreq) D_NOTICE("%s event %s\n", state->event_str, (script->summary == -ETIME) ? "timed out" : "failed"); + run_event_stop_running(state->run_ctx); tevent_req_done(req); return; } @@ -739,6 +861,7 @@ static void run_event_next_script(struct tevent_req *subreq) /* All scripts executed */ if (state->index >= state->script_list->num_scripts) { + run_event_stop_running(state->run_ctx); tevent_req_done(req); return; } @@ -748,6 +871,8 @@ static void run_event_next_script(struct tevent_req *subreq) return; } tevent_req_set_callback(subreq, run_event_next_script, req); + + state->script_subreq = subreq; } static void run_event_debug(struct tevent_req *req, pid_t pid) -- cgit v1.2.1