/* CTDB client code Copyright (C) Amitay Isaacs 2015 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . */ #include "replace.h" #include "system/network.h" #include "system/filesys.h" #include #include #include #include "common/logging.h" #include "lib/tdb_wrap/tdb_wrap.h" #include "lib/util/tevent_unix.h" #include "lib/util/dlinklist.h" #include "lib/util/debug.h" #include "protocol/protocol.h" #include "protocol/protocol_api.h" #include "client/client_private.h" #include "client/client.h" static struct ctdb_db_context *client_db_handle( struct ctdb_client_context *client, const char *db_name) { struct ctdb_db_context *db; for (db = client->db; db != NULL; db = db->next) { if (strcmp(db_name, db->db_name) == 0) { return db; } } return NULL; } static bool ctdb_db_persistent(struct ctdb_db_context *db) { if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) { return true; } return false; } static bool ctdb_db_replicated(struct ctdb_db_context *db) { if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) { return true; } return false; } static bool ctdb_db_volatile(struct ctdb_db_context *db) { if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT || db->db_flags & CTDB_DB_FLAGS_REPLICATED) { return false; } return true; } struct ctdb_set_db_flags_state { struct tevent_context *ev; struct ctdb_client_context *client; struct timeval timeout; uint32_t db_id; uint8_t db_flags; bool readonly_done, sticky_done; uint32_t *pnn_list; int count; }; static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq); static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq); static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq); static struct tevent_req *ctdb_set_db_flags_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, uint32_t destnode, struct timeval timeout, uint32_t db_id, uint8_t db_flags) { struct tevent_req *req, *subreq; struct ctdb_set_db_flags_state *state; struct ctdb_req_control request; req = tevent_req_create(mem_ctx, &state, struct ctdb_set_db_flags_state); if (req == NULL) { return NULL; } if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) { tevent_req_done(req); return tevent_req_post(req, ev); } state->ev = ev; state->client = client; state->timeout = timeout; state->db_id = db_id; state->db_flags = db_flags; ctdb_req_control_get_nodemap(&request); subreq = ctdb_client_control_send(state, ev, client, destnode, timeout, &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req); return req; } static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_set_db_flags_state *state = tevent_req_data( req, struct ctdb_set_db_flags_state); struct ctdb_req_control request; struct ctdb_reply_control *reply; struct ctdb_node_map *nodemap; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n", state->db_id, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); talloc_free(reply); if (ret != 0) { DEBUG(DEBUG_ERR, ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n", state->db_id, ret)); tevent_req_error(req, ret); return; } state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN, state, &state->pnn_list); talloc_free(nodemap); if (state->count <= 0) { DEBUG(DEBUG_ERR, ("set_db_flags: 0x%08x no connected nodes, count=%d\n", state->db_id, state->count)); tevent_req_error(req, ENOMEM); return; } if (state->db_flags & CTDB_DB_FLAGS_READONLY) { ctdb_req_control_set_db_readonly(&request, state->db_id); subreq = ctdb_client_control_multi_send( state, state->ev, state->client, state->pnn_list, state->count, state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_set_db_flags_readonly_done, req); } else { state->readonly_done = true; } if (state->db_flags & CTDB_DB_FLAGS_STICKY) { ctdb_req_control_set_db_sticky(&request, state->db_id); subreq = ctdb_client_control_multi_send( state, state->ev, state->client, state->pnn_list, state->count, state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done, req); } else { state->sticky_done = true; } } static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_set_db_flags_state *state = tevent_req_data( req, struct ctdb_set_db_flags_state); int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL, NULL); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n", state->db_id, ret)); tevent_req_error(req, ret); return; } state->readonly_done = true; if (state->readonly_done && state->sticky_done) { tevent_req_done(req); } } static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_set_db_flags_state *state = tevent_req_data( req, struct ctdb_set_db_flags_state); int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL, NULL); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n", state->db_id, ret)); tevent_req_error(req, ret); return; } state->sticky_done = true; if (state->readonly_done && state->sticky_done) { tevent_req_done(req); } } static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } struct ctdb_attach_state { struct tevent_context *ev; struct ctdb_client_context *client; struct timeval timeout; uint32_t destnode; uint8_t db_flags; struct ctdb_db_context *db; }; static void ctdb_attach_dbid_done(struct tevent_req *subreq); static void ctdb_attach_dbpath_done(struct tevent_req *subreq); static void ctdb_attach_health_done(struct tevent_req *subreq); static void ctdb_attach_flags_done(struct tevent_req *subreq); static void ctdb_attach_open_flags_done(struct tevent_req *subreq); struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct timeval timeout, const char *db_name, uint8_t db_flags) { struct tevent_req *req, *subreq; struct ctdb_attach_state *state; struct ctdb_req_control request; req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state); if (req == NULL) { return NULL; } state->db = client_db_handle(client, db_name); if (state->db != NULL) { tevent_req_done(req); return tevent_req_post(req, ev); } state->ev = ev; state->client = client; state->timeout = timeout; state->destnode = ctdb_client_pnn(client); state->db_flags = db_flags; state->db = talloc_zero(client, struct ctdb_db_context); if (tevent_req_nomem(state->db, req)) { return tevent_req_post(req, ev); } state->db->db_name = talloc_strdup(state->db, db_name); if (tevent_req_nomem(state->db, req)) { return tevent_req_post(req, ev); } state->db->db_flags = db_flags; if (ctdb_db_persistent(state->db)) { ctdb_req_control_db_attach_persistent(&request, state->db->db_name); } else if (ctdb_db_replicated(state->db)) { ctdb_req_control_db_attach_replicated(&request, state->db->db_name); } else { ctdb_req_control_db_attach(&request, state->db->db_name); } subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req); return req; } static void ctdb_attach_dbid_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_attach_state *state = tevent_req_data( req, struct ctdb_attach_state); struct ctdb_req_control request; struct ctdb_reply_control *reply; bool status; int ret; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n", state->db->db_name, (ctdb_db_persistent(state->db) ? "DB_ATTACH_PERSISTENT" : (ctdb_db_replicated(state->db) ? "DB_ATTACH_REPLICATED" : "DB_ATTACH")), ret)); tevent_req_error(req, ret); return; } if (ctdb_db_persistent(state->db)) { ret = ctdb_reply_control_db_attach_persistent( reply, &state->db->db_id); } else if (ctdb_db_replicated(state->db)) { ret = ctdb_reply_control_db_attach_replicated( reply, &state->db->db_id); } else { ret = ctdb_reply_control_db_attach(reply, &state->db->db_id); } talloc_free(reply); if (ret != 0) { DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } ctdb_req_control_getdbpath(&request, state->db->db_id); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req); } static void ctdb_attach_dbpath_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_attach_state *state = tevent_req_data( req, struct ctdb_attach_state); struct ctdb_reply_control *reply; struct ctdb_req_control request; bool status; int ret; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_getdbpath(reply, state->db, &state->db->db_path); talloc_free(reply); if (ret != 0) { DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } ctdb_req_control_db_get_health(&request, state->db->db_id); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_attach_health_done, req); } static void ctdb_attach_health_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_attach_state *state = tevent_req_data( req, struct ctdb_attach_state); struct ctdb_reply_control *reply; const char *reason; bool status; int ret; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_db_get_health(reply, state, &reason); if (ret != 0) { DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } if (reason != NULL) { /* Database unhealthy, avoid attach */ DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n", state->db->db_name, reason)); tevent_req_error(req, EIO); return; } subreq = ctdb_set_db_flags_send(state, state->ev, state->client, state->destnode, state->timeout, state->db->db_id, state->db_flags); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_attach_flags_done, req); } static void ctdb_attach_flags_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_attach_state *state = tevent_req_data( req, struct ctdb_attach_state); struct ctdb_req_control request; bool status; int ret; status = ctdb_set_db_flags_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n", state->db->db_name, state->db_flags)); tevent_req_error(req, ret); return; } ctdb_req_control_db_open_flags(&request, state->db->db_id); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req); } static void ctdb_attach_open_flags_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_attach_state *state = tevent_req_data( req, struct ctdb_attach_state); struct ctdb_reply_control *reply; bool status; int ret, tdb_flags; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags); talloc_free(reply); if (ret != 0) { DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed," " ret=%d\n", state->db->db_name, ret)); tevent_req_error(req, ret); return; } state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0, tdb_flags, O_RDWR, 0); if (tevent_req_nomem(state->db->ltdb, req)) { DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n", state->db->db_name)); return; } DLIST_ADD(state->client->db, state->db); tevent_req_done(req); } bool ctdb_attach_recv(struct tevent_req *req, int *perr, struct ctdb_db_context **out) { struct ctdb_attach_state *state = tevent_req_data( req, struct ctdb_attach_state); int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } if (out != NULL) { *out = state->db; } return true; } int ctdb_attach(struct tevent_context *ev, struct ctdb_client_context *client, struct timeval timeout, const char *db_name, uint8_t db_flags, struct ctdb_db_context **out) { TALLOC_CTX *mem_ctx; struct tevent_req *req; bool status; int ret; mem_ctx = talloc_new(client); if (mem_ctx == NULL) { return ENOMEM; } req = ctdb_attach_send(mem_ctx, ev, client, timeout, db_name, db_flags); if (req == NULL) { talloc_free(mem_ctx); return ENOMEM; } tevent_req_poll(req, ev); status = ctdb_attach_recv(req, &ret, out); if (! status) { talloc_free(mem_ctx); return ret; } /* ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func); ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func); ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func); */ talloc_free(mem_ctx); return 0; } struct ctdb_detach_state { struct ctdb_client_context *client; struct tevent_context *ev; struct timeval timeout; uint32_t db_id; const char *db_name; }; static void ctdb_detach_dbname_done(struct tevent_req *subreq); static void ctdb_detach_done(struct tevent_req *subreq); struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct timeval timeout, uint32_t db_id) { struct tevent_req *req, *subreq; struct ctdb_detach_state *state; struct ctdb_req_control request; req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state); if (req == NULL) { return NULL; } state->client = client; state->ev = ev; state->timeout = timeout; state->db_id = db_id; ctdb_req_control_get_dbname(&request, db_id); subreq = ctdb_client_control_send(state, ev, client, ctdb_client_pnn(client), timeout, &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req); return req; } static void ctdb_detach_dbname_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_detach_state *state = tevent_req_data( req, struct ctdb_detach_state); struct ctdb_reply_control *reply; struct ctdb_req_control request; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n", state->db_id, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name); if (ret != 0) { DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n", state->db_id, ret)); tevent_req_error(req, ret); return; } ctdb_req_control_db_detach(&request, state->db_id); subreq = ctdb_client_control_send(state, state->ev, state->client, ctdb_client_pnn(state->client), state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_detach_done, req); } static void ctdb_detach_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_detach_state *state = tevent_req_data( req, struct ctdb_detach_state); struct ctdb_reply_control *reply; struct ctdb_db_context *db; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n", state->db_name, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_db_detach(reply); if (ret != 0) { DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n", state->db_name, ret)); tevent_req_error(req, ret); return; } db = client_db_handle(state->client, state->db_name); if (db != NULL) { DLIST_REMOVE(state->client->db, db); TALLOC_FREE(db); } tevent_req_done(req); } bool ctdb_detach_recv(struct tevent_req *req, int *perr) { int ret; if (tevent_req_is_unix_error(req, &ret)) { if (perr != NULL) { *perr = ret; } return false; } return true; } int ctdb_detach(struct tevent_context *ev, struct ctdb_client_context *client, struct timeval timeout, uint32_t db_id) { TALLOC_CTX *mem_ctx; struct tevent_req *req; int ret; bool status; mem_ctx = talloc_new(client); if (mem_ctx == NULL) { return ENOMEM; } req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id); if (req == NULL) { talloc_free(mem_ctx); return ENOMEM; } tevent_req_poll(req, ev); status = ctdb_detach_recv(req, &ret); if (! status) { talloc_free(mem_ctx); return ret; } talloc_free(mem_ctx); return 0; } uint32_t ctdb_db_id(struct ctdb_db_context *db) { return db->db_id; } struct ctdb_db_traverse_local_state { ctdb_rec_parser_func_t parser; void *private_data; bool extract_header; int error; }; static int ctdb_db_traverse_local_handler(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data) { struct ctdb_db_traverse_local_state *state = (struct ctdb_db_traverse_local_state *)private_data; int ret; if (state->extract_header) { struct ctdb_ltdb_header header; ret = ctdb_ltdb_header_extract(&data, &header); if (ret != 0) { state->error = ret; return 1; } ret = state->parser(0, &header, key, data, state->private_data); } else { ret = state->parser(0, NULL, key, data, state->private_data); } if (ret != 0) { state->error = ret; return 1; } return 0; } int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly, bool extract_header, ctdb_rec_parser_func_t parser, void *private_data) { struct ctdb_db_traverse_local_state state; int ret; state.parser = parser; state.private_data = private_data; state.extract_header = extract_header; state.error = 0; if (readonly) { ret = tdb_traverse_read(db->ltdb->tdb, ctdb_db_traverse_local_handler, &state); } else { ret = tdb_traverse(db->ltdb->tdb, ctdb_db_traverse_local_handler, &state); } if (ret == -1) { return EIO; } return state.error; } struct ctdb_db_traverse_state { struct tevent_context *ev; struct ctdb_client_context *client; struct ctdb_db_context *db; uint32_t destnode; uint64_t srvid; struct timeval timeout; ctdb_rec_parser_func_t parser; void *private_data; int result; }; static void ctdb_db_traverse_handler_set(struct tevent_req *subreq); static void ctdb_db_traverse_started(struct tevent_req *subreq); static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data); static void ctdb_db_traverse_remove_handler(struct tevent_req *req); static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq); struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_db_context *db, uint32_t destnode, struct timeval timeout, ctdb_rec_parser_func_t parser, void *private_data) { struct tevent_req *req, *subreq; struct ctdb_db_traverse_state *state; req = tevent_req_create(mem_ctx, &state, struct ctdb_db_traverse_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->db = db; state->destnode = destnode; state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid(); state->timeout = timeout; state->parser = parser; state->private_data = private_data; subreq = ctdb_client_set_message_handler_send(state, ev, client, state->srvid, ctdb_db_traverse_handler, req); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req); return req; } static void ctdb_db_traverse_handler_set(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_db_traverse_state *state = tevent_req_data( req, struct ctdb_db_traverse_state); struct ctdb_traverse_start_ext traverse; struct ctdb_req_control request; int ret = 0; bool status; status = ctdb_client_set_message_handler_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; } traverse = (struct ctdb_traverse_start_ext) { .db_id = ctdb_db_id(state->db), .reqid = 0, .srvid = state->srvid, .withemptyrecords = false, }; ctdb_req_control_traverse_start_ext(&request, &traverse); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, state->timeout, &request); if (subreq == NULL) { state->result = ENOMEM; ctdb_db_traverse_remove_handler(req); return; } tevent_req_set_callback(subreq, ctdb_db_traverse_started, req); } static void ctdb_db_traverse_started(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_db_traverse_state *state = tevent_req_data( req, struct ctdb_db_traverse_state); struct ctdb_reply_control *reply; int ret = 0; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret)); state->result = ret; ctdb_db_traverse_remove_handler(req); return; } ret = ctdb_reply_control_traverse_start_ext(reply); talloc_free(reply); if (ret != 0) { DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n", ret)); state->result = ret; ctdb_db_traverse_remove_handler(req); return; } } static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); struct ctdb_db_traverse_state *state = tevent_req_data( req, struct ctdb_db_traverse_state); struct ctdb_rec_data *rec; struct ctdb_ltdb_header header; size_t np; int ret; ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np); if (ret != 0) { return; } if (rec->key.dsize == 0 && rec->data.dsize == 0) { talloc_free(rec); ctdb_db_traverse_remove_handler(req); return; } ret = ctdb_ltdb_header_extract(&rec->data, &header); if (ret != 0) { talloc_free(rec); return; } if (rec->data.dsize == 0) { talloc_free(rec); return; } ret = state->parser(rec->reqid, &header, rec->key, rec->data, state->private_data); talloc_free(rec); if (ret != 0) { state->result = ret; ctdb_db_traverse_remove_handler(req); } } static void ctdb_db_traverse_remove_handler(struct tevent_req *req) { struct ctdb_db_traverse_state *state = tevent_req_data( req, struct ctdb_db_traverse_state); struct tevent_req *subreq; subreq = ctdb_client_remove_message_handler_send(state, state->ev, state->client, state->srvid, req); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req); } static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_db_traverse_state *state = tevent_req_data( req, struct ctdb_db_traverse_state); int ret; bool status; status = ctdb_client_remove_message_handler_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; } if (state->result != 0) { tevent_req_error(req, state->result); return; } tevent_req_done(req); } bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr) { int ret; if (tevent_req_is_unix_error(req, &ret)) { if (perr != NULL) { *perr = ret; } return false; } return true; } int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_db_context *db, uint32_t destnode, struct timeval timeout, ctdb_rec_parser_func_t parser, void *private_data) { struct tevent_req *req; int ret = 0; bool status; req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode, timeout, parser, private_data); if (req == NULL) { return ENOMEM; } tevent_req_poll(req, ev); status = ctdb_db_traverse_recv(req, &ret); if (! status) { return ret; } return 0; } int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, TDB_DATA *data) { TDB_DATA rec; size_t np; int ret; rec = tdb_fetch(db->ltdb->tdb, key); if (rec.dsize < sizeof(struct ctdb_ltdb_header)) { /* No record present */ if (rec.dptr != NULL) { free(rec.dptr); } if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) { return EIO; } *header = (struct ctdb_ltdb_header) { .dmaster = CTDB_UNKNOWN_PNN, }; if (data != NULL) { *data = tdb_null; } return 0; } ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np); if (ret != 0) { return ret; } ret = 0; if (data != NULL) { data->dsize = rec.dsize - np; data->dptr = talloc_memdup(mem_ctx, rec.dptr + np, data->dsize); if (data->dptr == NULL) { ret = ENOMEM; } } free(rec.dptr); return ret; } /* * Fetch a record from volatile database * * Steps: * 1. Get a lock on the hash chain * 2. If the record does not exist, migrate the record * 3. If readonly=true and delegations do not exist, migrate the record. * 4. If readonly=false and delegations exist, migrate the record. * 5. If the local node is not dmaster, migrate the record. * 6. Return record */ struct ctdb_fetch_lock_state { struct tevent_context *ev; struct ctdb_client_context *client; struct ctdb_record_handle *h; bool readonly; uint32_t pnn; }; static int ctdb_fetch_lock_check(struct tevent_req *req); static void ctdb_fetch_lock_migrate(struct tevent_req *req); static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq); struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_db_context *db, TDB_DATA key, bool readonly) { struct ctdb_fetch_lock_state *state; struct tevent_req *req; int ret; req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->h = talloc_zero(db, struct ctdb_record_handle); if (tevent_req_nomem(state->h, req)) { return tevent_req_post(req, ev); } state->h->ev = ev; state->h->client = client; state->h->db = db; state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize); if (tevent_req_nomem(state->h->key.dptr, req)) { return tevent_req_post(req, ev); } state->h->key.dsize = key.dsize; state->h->readonly = false; state->readonly = readonly; state->pnn = ctdb_client_pnn(client); /* Check that database is not persistent */ if (! ctdb_db_volatile(db)) { DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n", db->db_name)); tevent_req_error(req, EINVAL); return tevent_req_post(req, ev); } ret = ctdb_fetch_lock_check(req); if (ret == 0) { tevent_req_done(req); return tevent_req_post(req, ev); } if (ret != EAGAIN) { tevent_req_error(req, ret); return tevent_req_post(req, ev); } return req; } static int ctdb_fetch_lock_check(struct tevent_req *req) { struct ctdb_fetch_lock_state *state = tevent_req_data( req, struct ctdb_fetch_lock_state); struct ctdb_record_handle *h = state->h; struct ctdb_ltdb_header header; TDB_DATA data = tdb_null; size_t np; int ret, err = 0; bool do_migrate = false; ret = tdb_chainlock(h->db->ltdb->tdb, h->key); if (ret != 0) { DEBUG(DEBUG_ERR, ("fetch_lock: %s tdb_chainlock failed, %s\n", h->db->db_name, tdb_errorstr(h->db->ltdb->tdb))); err = EIO; goto failed; } data = tdb_fetch(h->db->ltdb->tdb, h->key); if (data.dptr == NULL) { if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) { goto migrate; } else { err = EIO; goto failed; } } /* Got the record */ ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np); if (ret != 0) { err = ret; goto failed; } if (! state->readonly) { /* Read/write access */ if (header.dmaster == state->pnn && header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) { goto migrate; } if (header.dmaster != state->pnn) { goto migrate; } } else { /* Readonly access */ if (header.dmaster != state->pnn && ! (header.flags & (CTDB_REC_RO_HAVE_READONLY | CTDB_REC_RO_HAVE_DELEGATIONS))) { goto migrate; } } /* We are the dmaster or readonly delegation */ h->header = header; h->data = data; if (header.flags & (CTDB_REC_RO_HAVE_READONLY | CTDB_REC_RO_HAVE_DELEGATIONS)) { h->readonly = true; } return 0; migrate: do_migrate = true; err = EAGAIN; failed: if (data.dptr != NULL) { free(data.dptr); } ret = tdb_chainunlock(h->db->ltdb->tdb, h->key); if (ret != 0) { DEBUG(DEBUG_ERR, ("fetch_lock: %s tdb_chainunlock failed, %s\n", h->db->db_name, tdb_errorstr(h->db->ltdb->tdb))); return EIO; } if (do_migrate) { ctdb_fetch_lock_migrate(req); } return err; } static void ctdb_fetch_lock_migrate(struct tevent_req *req) { struct ctdb_fetch_lock_state *state = tevent_req_data( req, struct ctdb_fetch_lock_state); struct ctdb_req_call request; struct tevent_req *subreq; ZERO_STRUCT(request); request.flags = CTDB_IMMEDIATE_MIGRATION; if (state->readonly) { request.flags |= CTDB_WANT_READONLY; } request.db_id = state->h->db->db_id; request.callid = CTDB_NULL_FUNC; request.key = state->h->key; request.calldata = tdb_null; subreq = ctdb_client_call_send(state, state->ev, state->client, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req); } static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_fetch_lock_state *state = tevent_req_data( req, struct ctdb_fetch_lock_state); struct ctdb_reply_call *reply; int ret; bool status; status = ctdb_client_call_recv(subreq, state, &reply, &ret); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n", state->h->db->db_name, ret)); tevent_req_error(req, ret); return; } if (reply->status != 0) { tevent_req_error(req, EIO); return; } talloc_free(reply); ret = ctdb_fetch_lock_check(req); if (ret != 0) { if (ret != EAGAIN) { tevent_req_error(req, ret); } return; } tevent_req_done(req); } static int ctdb_record_handle_destructor(struct ctdb_record_handle *h) { int ret; ret = tdb_chainunlock(h->db->ltdb->tdb, h->key); if (ret != 0) { DEBUG(DEBUG_ERR, ("fetch_lock: %s tdb_chainunlock failed, %s\n", h->db->db_name, tdb_errorstr(h->db->ltdb->tdb))); } free(h->data.dptr); return 0; } struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, TDB_DATA *data, int *perr) { struct ctdb_fetch_lock_state *state = tevent_req_data( req, struct ctdb_fetch_lock_state); struct ctdb_record_handle *h = state->h; int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { TALLOC_FREE(state->h); *perr = err; } return NULL; } if (header != NULL) { *header = h->header; } if (data != NULL) { size_t offset; offset = ctdb_ltdb_header_len(&h->header); data->dsize = h->data.dsize - offset; if (data->dsize == 0) { data->dptr = NULL; } else { data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset, data->dsize); if (data->dptr == NULL) { TALLOC_FREE(state->h); if (perr != NULL) { *perr = ENOMEM; } return NULL; } } } talloc_set_destructor(h, ctdb_record_handle_destructor); return h; } int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_db_context *db, TDB_DATA key, bool readonly, struct ctdb_record_handle **out, struct ctdb_ltdb_header *header, TDB_DATA *data) { struct tevent_req *req; struct ctdb_record_handle *h; int ret; req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly); if (req == NULL) { return ENOMEM; } tevent_req_poll(req, ev); h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret); if (h == NULL) { return ret; } *out = h; return 0; } int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data) { uint8_t header[sizeof(struct ctdb_ltdb_header)]; TDB_DATA rec[2]; size_t np; int ret; /* Cannot modify the record if it was obtained as a readonly copy */ if (h->readonly) { return EINVAL; } /* Check if the new data is same */ if (h->data.dsize == data.dsize && memcmp(h->data.dptr, data.dptr, data.dsize) == 0) { /* No need to do anything */ return 0; } ctdb_ltdb_header_push(&h->header, header, &np); rec[0].dsize = np; rec[0].dptr = header; rec[1].dsize = data.dsize; rec[1].dptr = data.dptr; ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE); if (ret != 0) { DEBUG(DEBUG_ERR, ("store_record: %s tdb_storev failed, %s\n", h->db->db_name, tdb_errorstr(h->db->ltdb->tdb))); return EIO; } return 0; } struct ctdb_delete_record_state { struct ctdb_record_handle *h; }; static void ctdb_delete_record_done(struct tevent_req *subreq); struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_record_handle *h) { struct tevent_req *req, *subreq; struct ctdb_delete_record_state *state; struct ctdb_key_data key; struct ctdb_req_control request; uint8_t header[sizeof(struct ctdb_ltdb_header)]; TDB_DATA rec; size_t np; int ret; req = tevent_req_create(mem_ctx, &state, struct ctdb_delete_record_state); if (req == NULL) { return NULL; } state->h = h; /* Cannot delete the record if it was obtained as a readonly copy */ if (h->readonly) { DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n", h->db->db_name)); tevent_req_error(req, EINVAL); return tevent_req_post(req, ev); } ctdb_ltdb_header_push(&h->header, header, &np); rec.dsize = np; rec.dptr = header; ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE); if (ret != 0) { DEBUG(DEBUG_ERR, ("fetch_lock delete: %s tdb_sore failed, %s\n", h->db->db_name, tdb_errorstr(h->db->ltdb->tdb))); tevent_req_error(req, EIO); return tevent_req_post(req, ev); } key.db_id = h->db->db_id; key.header = h->header; key.key = h->key; ctdb_req_control_schedule_for_deletion(&request, &key); subreq = ctdb_client_control_send(state, ev, h->client, ctdb_client_pnn(h->client), tevent_timeval_zero(), &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_delete_record_done, req); return req; } static void ctdb_delete_record_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_delete_record_state *state = tevent_req_data( req, struct ctdb_delete_record_state); int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, NULL, NULL); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("delete_record: %s SCHDULE_FOR_DELETION failed, " "ret=%d\n", state->h->db->db_name, ret)); tevent_req_error(req, ret); return; } tevent_req_done(req); } bool ctdb_delete_record_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } int ctdb_delete_record(struct ctdb_record_handle *h) { struct tevent_context *ev = h->ev; TALLOC_CTX *mem_ctx; struct tevent_req *req; int ret; bool status; mem_ctx = talloc_new(NULL); if (mem_ctx == NULL) { return ENOMEM; } req = ctdb_delete_record_send(mem_ctx, ev, h); if (req == NULL) { talloc_free(mem_ctx); return ENOMEM; } tevent_req_poll(req, ev); status = ctdb_delete_record_recv(req, &ret); talloc_free(mem_ctx); if (! status) { return ret; } return 0; } /* * Global lock functions */ struct ctdb_g_lock_lock_state { struct tevent_context *ev; struct ctdb_client_context *client; struct ctdb_db_context *db; TDB_DATA key; struct ctdb_server_id my_sid; enum ctdb_g_lock_type lock_type; struct ctdb_record_handle *h; /* state for verification of active locks */ struct ctdb_g_lock_list *lock_list; unsigned int current; }; static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq); static void ctdb_g_lock_lock_process_locks(struct tevent_req *req); static void ctdb_g_lock_lock_checked(struct tevent_req *subreq); static int ctdb_g_lock_lock_update(struct tevent_req *req); static void ctdb_g_lock_lock_retry(struct tevent_req *subreq); static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1, enum ctdb_g_lock_type l2) { if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) { return false; } return true; } struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_db_context *db, const char *keyname, struct ctdb_server_id *sid, bool readonly) { struct tevent_req *req, *subreq; struct ctdb_g_lock_lock_state *state; req = tevent_req_create(mem_ctx, &state, struct ctdb_g_lock_lock_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->db = db; state->key.dptr = discard_const(keyname); state->key.dsize = strlen(keyname) + 1; state->my_sid = *sid; state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE); subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key, false); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req); return req; } static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_g_lock_lock_state *state = tevent_req_data( req, struct ctdb_g_lock_lock_state); TDB_DATA data; size_t np; int ret = 0; state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret); TALLOC_FREE(subreq); if (state->h == NULL) { DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n", (char *)state->key.dptr)); tevent_req_error(req, ret); return; } if (state->lock_list != NULL) { TALLOC_FREE(state->lock_list); state->current = 0; } ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state, &state->lock_list, &np); talloc_free(data.dptr); if (ret != 0) { DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n", (char *)state->key.dptr)); tevent_req_error(req, ret); return; } ctdb_g_lock_lock_process_locks(req); } static void ctdb_g_lock_lock_process_locks(struct tevent_req *req) { struct ctdb_g_lock_lock_state *state = tevent_req_data( req, struct ctdb_g_lock_lock_state); struct tevent_req *subreq; struct ctdb_g_lock *lock; bool check_server = false; int ret; while (state->current < state->lock_list->num) { lock = &state->lock_list->lock[state->current]; /* We should not ask for the same lock more than once */ if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) { DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n", (char *)state->key.dptr)); tevent_req_error(req, EDEADLK); return; } if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) { check_server = true; break; } state->current += 1; } if (check_server) { struct ctdb_req_control request; ctdb_req_control_process_exists(&request, lock->sid.pid); subreq = ctdb_client_control_send(state, state->ev, state->client, lock->sid.vnn, tevent_timeval_zero(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req); return; } /* There is no conflict, add ourself to the lock_list */ state->lock_list->lock = talloc_realloc(state->lock_list, state->lock_list->lock, struct ctdb_g_lock, state->lock_list->num + 1); if (state->lock_list->lock == NULL) { tevent_req_error(req, ENOMEM); return; } lock = &state->lock_list->lock[state->lock_list->num]; lock->type = state->lock_type; lock->sid = state->my_sid; state->lock_list->num += 1; ret = ctdb_g_lock_lock_update(req); if (ret != 0) { tevent_req_error(req, ret); return; } TALLOC_FREE(state->h); tevent_req_done(req); } static void ctdb_g_lock_lock_checked(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_g_lock_lock_state *state = tevent_req_data( req, struct ctdb_g_lock_lock_state); struct ctdb_reply_control *reply; int ret, value; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n", (char *)state->key.dptr, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_process_exists(reply, &value); if (ret != 0) { tevent_req_error(req, ret); return; } talloc_free(reply); if (value == 0) { /* server process exists, need to retry */ TALLOC_FREE(state->h); subreq = tevent_wakeup_send(state, state->ev, tevent_timeval_current_ofs(0,1000)); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req); return; } /* server process does not exist, remove conflicting entry */ state->lock_list->lock[state->current] = state->lock_list->lock[state->lock_list->num-1]; state->lock_list->num -= 1; ret = ctdb_g_lock_lock_update(req); if (ret != 0) { tevent_req_error(req, ret); return; } ctdb_g_lock_lock_process_locks(req); } static int ctdb_g_lock_lock_update(struct tevent_req *req) { struct ctdb_g_lock_lock_state *state = tevent_req_data( req, struct ctdb_g_lock_lock_state); TDB_DATA data; size_t np; int ret; data.dsize = ctdb_g_lock_list_len(state->lock_list); data.dptr = talloc_size(state, data.dsize); if (data.dptr == NULL) { return ENOMEM; } ctdb_g_lock_list_push(state->lock_list, data.dptr, &np); ret = ctdb_store_record(state->h, data); talloc_free(data.dptr); return ret; } static void ctdb_g_lock_lock_retry(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_g_lock_lock_state *state = tevent_req_data( req, struct ctdb_g_lock_lock_state); bool success; success = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (! success) { tevent_req_error(req, ENOMEM); return; } subreq = ctdb_fetch_lock_send(state, state->ev, state->client, state->db, state->key, false); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req); } bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr) { struct ctdb_g_lock_lock_state *state = tevent_req_data( req, struct ctdb_g_lock_lock_state); int err; TALLOC_FREE(state->h); if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } struct ctdb_g_lock_unlock_state { struct tevent_context *ev; struct ctdb_client_context *client; struct ctdb_db_context *db; TDB_DATA key; struct ctdb_server_id my_sid; struct ctdb_record_handle *h; struct ctdb_g_lock_list *lock_list; }; static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq); static int ctdb_g_lock_unlock_update(struct tevent_req *req); static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq); struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_db_context *db, const char *keyname, struct ctdb_server_id sid) { struct tevent_req *req, *subreq; struct ctdb_g_lock_unlock_state *state; req = tevent_req_create(mem_ctx, &state, struct ctdb_g_lock_unlock_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->db = db; state->key.dptr = discard_const(keyname); state->key.dsize = strlen(keyname) + 1; state->my_sid = sid; subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key, false); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req); return req; } static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_g_lock_unlock_state *state = tevent_req_data( req, struct ctdb_g_lock_unlock_state); TDB_DATA data; size_t np; int ret = 0; state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret); TALLOC_FREE(subreq); if (state->h == NULL) { DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n", (char *)state->key.dptr)); tevent_req_error(req, ret); return; } ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state, &state->lock_list, &np); if (ret != 0) { DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n", (char *)state->key.dptr)); tevent_req_error(req, ret); return; } ret = ctdb_g_lock_unlock_update(req); if (ret != 0) { tevent_req_error(req, ret); return; } if (state->lock_list->num == 0) { subreq = ctdb_delete_record_send(state, state->ev, state->h); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted, req); return; } TALLOC_FREE(state->h); tevent_req_done(req); } static int ctdb_g_lock_unlock_update(struct tevent_req *req) { struct ctdb_g_lock_unlock_state *state = tevent_req_data( req, struct ctdb_g_lock_unlock_state); struct ctdb_g_lock *lock; int ret, i; for (i=0; ilock_list->num; i++) { lock = &state->lock_list->lock[i]; if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) { break; } } if (i < state->lock_list->num) { state->lock_list->lock[i] = state->lock_list->lock[state->lock_list->num-1]; state->lock_list->num -= 1; } if (state->lock_list->num != 0) { TDB_DATA data; size_t np; data.dsize = ctdb_g_lock_list_len(state->lock_list); data.dptr = talloc_size(state, data.dsize); if (data.dptr == NULL) { return ENOMEM; } ctdb_g_lock_list_push(state->lock_list, data.dptr, &np); ret = ctdb_store_record(state->h, data); talloc_free(data.dptr); if (ret != 0) { return ret; } } return 0; } static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_g_lock_unlock_state *state = tevent_req_data( req, struct ctdb_g_lock_unlock_state); int ret; bool status; status = ctdb_delete_record_recv(subreq, &ret); if (! status) { DEBUG(DEBUG_ERR, ("g_lock_unlock %s delete record failed, ret=%d\n", (char *)state->key.dptr, ret)); tevent_req_error(req, ret); return; } TALLOC_FREE(state->h); tevent_req_done(req); } bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr) { struct ctdb_g_lock_unlock_state *state = tevent_req_data( req, struct ctdb_g_lock_unlock_state); int err; TALLOC_FREE(state->h); if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } /* * Persistent database functions */ struct ctdb_transaction_start_state { struct tevent_context *ev; struct ctdb_client_context *client; struct timeval timeout; struct ctdb_transaction_handle *h; uint32_t destnode; }; static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq); static void ctdb_transaction_g_lock_done(struct tevent_req *subreq); struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct timeval timeout, struct ctdb_db_context *db, bool readonly) { struct ctdb_transaction_start_state *state; struct tevent_req *req, *subreq; struct ctdb_transaction_handle *h; req = tevent_req_create(mem_ctx, &state, struct ctdb_transaction_start_state); if (req == NULL) { return NULL; } if (ctdb_db_volatile(db)) { tevent_req_error(req, EINVAL); return tevent_req_post(req, ev); } state->ev = ev; state->client = client; state->destnode = ctdb_client_pnn(client); h = talloc_zero(db, struct ctdb_transaction_handle); if (tevent_req_nomem(h, req)) { return tevent_req_post(req, ev); } h->ev = ev; h->client = client; h->db = db; h->readonly = readonly; h->updated = false; /* SRVID is unique for databases, so client can have transactions * active for multiple databases */ h->sid = ctdb_client_get_server_id(client, db->db_id); h->recbuf = ctdb_rec_buffer_init(h, db->db_id); if (tevent_req_nomem(h->recbuf, req)) { return tevent_req_post(req, ev); } h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id); if (tevent_req_nomem(h->lock_name, req)) { return tevent_req_post(req, ev); } state->h = h; subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req); return req; } static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_transaction_start_state *state = tevent_req_data( req, struct ctdb_transaction_start_state); bool status; int ret; status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("transaction_start: %s attach g_lock.tdb failed\n", state->h->db->db_name)); tevent_req_error(req, ret); return; } subreq = ctdb_g_lock_lock_send(state, state->ev, state->client, state->h->db_g_lock, state->h->lock_name, &state->h->sid, state->h->readonly); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req); } static void ctdb_transaction_g_lock_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_transaction_start_state *state = tevent_req_data( req, struct ctdb_transaction_start_state); int ret; bool status; status = ctdb_g_lock_lock_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("transaction_start: %s g_lock lock failed, ret=%d\n", state->h->db->db_name, ret)); tevent_req_error(req, ret); return; } tevent_req_done(req); } struct ctdb_transaction_handle *ctdb_transaction_start_recv( struct tevent_req *req, int *perr) { struct ctdb_transaction_start_state *state = tevent_req_data( req, struct ctdb_transaction_start_state); int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return NULL; } return state->h; } int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct timeval timeout, struct ctdb_db_context *db, bool readonly, struct ctdb_transaction_handle **out) { struct tevent_req *req; struct ctdb_transaction_handle *h; int ret; req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db, readonly); if (req == NULL) { return ENOMEM; } tevent_req_poll(req, ev); h = ctdb_transaction_start_recv(req, &ret); if (h == NULL) { return ret; } *out = h; return 0; } struct ctdb_transaction_record_fetch_state { TDB_DATA key, data; struct ctdb_ltdb_header header; bool found; }; static int ctdb_transaction_record_fetch_traverse( uint32_t reqid, struct ctdb_ltdb_header *nullheader, TDB_DATA key, TDB_DATA data, void *private_data) { struct ctdb_transaction_record_fetch_state *state = (struct ctdb_transaction_record_fetch_state *)private_data; if (state->key.dsize == key.dsize && memcmp(state->key.dptr, key.dptr, key.dsize) == 0) { int ret; ret = ctdb_ltdb_header_extract(&data, &state->header); if (ret != 0) { DEBUG(DEBUG_ERR, ("record_fetch: Failed to extract header, " "ret=%d\n", ret)); return 1; } state->data = data; state->found = true; } return 0; } static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data) { struct ctdb_transaction_record_fetch_state state; int ret; state.key = key; state.found = false; ret = ctdb_rec_buffer_traverse(h->recbuf, ctdb_transaction_record_fetch_traverse, &state); if (ret != 0) { return ret; } if (state.found) { if (header != NULL) { *header = state.header; } if (data != NULL) { *data = state.data; } return 0; } return ENOENT; } int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h, TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data) { TDB_DATA tmp_data; struct ctdb_ltdb_header header; int ret; ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data); if (ret == 0) { data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr, tmp_data.dsize); if (data->dptr == NULL) { return ENOMEM; } data->dsize = tmp_data.dsize; return 0; } ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data); if (ret != 0) { return ret; } ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data); if (ret != 0) { return ret; } return 0; } int ctdb_transaction_store_record(struct ctdb_transaction_handle *h, TDB_DATA key, TDB_DATA data) { TALLOC_CTX *tmp_ctx; struct ctdb_ltdb_header header; TDB_DATA old_data; int ret; if (h->readonly) { return EINVAL; } tmp_ctx = talloc_new(h); if (tmp_ctx == NULL) { return ENOMEM; } ret = ctdb_transaction_record_fetch(h, key, &header, &old_data); if (ret != 0) { ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data); if (ret != 0) { return ret; } } if (old_data.dsize == data.dsize && memcmp(old_data.dptr, data.dptr, data.dsize) == 0) { talloc_free(tmp_ctx); return 0; } header.dmaster = ctdb_client_pnn(h->client); header.rsn += 1; ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data); talloc_free(tmp_ctx); if (ret != 0) { return ret; } h->updated = true; return 0; } int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h, TDB_DATA key) { return ctdb_transaction_store_record(h, key, tdb_null); } static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h, uint64_t *seqnum) { const char *keyname = CTDB_DB_SEQNUM_KEY; TDB_DATA key, data; struct ctdb_ltdb_header header; int ret; key.dptr = discard_const(keyname); key.dsize = strlen(keyname) + 1; ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data); if (ret != 0) { DEBUG(DEBUG_ERR, ("transaction_commit: %s seqnum fetch failed, ret=%d\n", h->db->db_name, ret)); return ret; } if (data.dsize == 0) { /* initial data */ *seqnum = 0; return 0; } if (data.dsize != sizeof(uint64_t)) { talloc_free(data.dptr); return EINVAL; } *seqnum = *(uint64_t *)data.dptr; talloc_free(data.dptr); return 0; } static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h, uint64_t seqnum) { const char *keyname = CTDB_DB_SEQNUM_KEY; TDB_DATA key, data; key.dptr = discard_const(keyname); key.dsize = strlen(keyname) + 1; data.dptr = (uint8_t *)&seqnum; data.dsize = sizeof(seqnum); return ctdb_transaction_store_record(h, key, data); } struct ctdb_transaction_commit_state { struct tevent_context *ev; struct timeval timeout; struct ctdb_transaction_handle *h; uint64_t seqnum; }; static void ctdb_transaction_commit_done(struct tevent_req *subreq); static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq); struct tevent_req *ctdb_transaction_commit_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct timeval timeout, struct ctdb_transaction_handle *h) { struct tevent_req *req, *subreq; struct ctdb_transaction_commit_state *state; struct ctdb_req_control request; int ret; req = tevent_req_create(mem_ctx, &state, struct ctdb_transaction_commit_state); if (req == NULL) { return NULL; } state->ev = ev; state->timeout = timeout; state->h = h; ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum); if (ret != 0) { tevent_req_error(req, ret); return tevent_req_post(req, ev); } ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1); if (ret != 0) { tevent_req_error(req, ret); return tevent_req_post(req, ev); } ctdb_req_control_trans3_commit(&request, h->recbuf); subreq = ctdb_client_control_send(state, ev, h->client, ctdb_client_pnn(h->client), timeout, &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req); return req; } static void ctdb_transaction_commit_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_transaction_commit_state *state = tevent_req_data( req, struct ctdb_transaction_commit_state); struct ctdb_transaction_handle *h = state->h; struct ctdb_reply_control *reply; uint64_t seqnum; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n", h->db->db_name, ret)); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_trans3_commit(reply); talloc_free(reply); if (ret != 0) { /* Control failed due to recovery */ ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum); if (ret != 0) { tevent_req_error(req, ret); return; } if (seqnum == state->seqnum) { struct ctdb_req_control request; /* try again */ ctdb_req_control_trans3_commit(&request, state->h->recbuf); subreq = ctdb_client_control_send( state, state->ev, state->h->client, ctdb_client_pnn(state->h->client), state->timeout, &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req); return; } if (seqnum != state->seqnum + 1) { DEBUG(DEBUG_ERR, ("transaction_commit: %s seqnum mismatch " "0x%"PRIx64" != 0x%"PRIx64" + 1\n", state->h->db->db_name, seqnum, state->seqnum)); tevent_req_error(req, EIO); return; } } /* trans3_commit successful */ subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client, h->db_g_lock, h->lock_name, h->sid); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done, req); } static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_transaction_commit_state *state = tevent_req_data( req, struct ctdb_transaction_commit_state); int ret; bool status; status = ctdb_g_lock_unlock_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("transaction_commit: %s g_lock unlock failed, ret=%d\n", state->h->db->db_name, ret)); tevent_req_error(req, ret); return; } talloc_free(state->h); tevent_req_done(req); } bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } int ctdb_transaction_commit(struct ctdb_transaction_handle *h) { struct tevent_context *ev = h->ev; TALLOC_CTX *mem_ctx; struct tevent_req *req; int ret; bool status; if (h->readonly || ! h->updated) { return ctdb_transaction_cancel(h); } mem_ctx = talloc_new(NULL); if (mem_ctx == NULL) { return ENOMEM; } req = ctdb_transaction_commit_send(mem_ctx, ev, tevent_timeval_zero(), h); if (req == NULL) { talloc_free(mem_ctx); return ENOMEM; } tevent_req_poll(req, ev); status = ctdb_transaction_commit_recv(req, &ret); if (! status) { talloc_free(mem_ctx); return ret; } talloc_free(mem_ctx); return 0; } struct ctdb_transaction_cancel_state { struct tevent_context *ev; struct ctdb_transaction_handle *h; struct timeval timeout; }; static void ctdb_transaction_cancel_done(struct tevent_req *subreq); struct tevent_req *ctdb_transaction_cancel_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct timeval timeout, struct ctdb_transaction_handle *h) { struct tevent_req *req, *subreq; struct ctdb_transaction_cancel_state *state; req = tevent_req_create(mem_ctx, &state, struct ctdb_transaction_cancel_state); if (req == NULL) { return NULL; } state->ev = ev; state->h = h; state->timeout = timeout; subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client, state->h->db_g_lock, state->h->lock_name, state->h->sid); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_transaction_cancel_done, req); return req; } static void ctdb_transaction_cancel_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_transaction_cancel_state *state = tevent_req_data( req, struct ctdb_transaction_cancel_state); int ret; bool status; status = ctdb_g_lock_unlock_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { DEBUG(DEBUG_ERR, ("transaction_cancel: %s g_lock unlock failed, ret=%d\n", state->h->db->db_name, ret)); talloc_free(state->h); tevent_req_error(req, ret); return; } talloc_free(state->h); tevent_req_done(req); } bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } int ctdb_transaction_cancel(struct ctdb_transaction_handle *h) { struct tevent_context *ev = h->ev; struct tevent_req *req; TALLOC_CTX *mem_ctx; int ret; bool status; mem_ctx = talloc_new(NULL); if (mem_ctx == NULL) { talloc_free(h); return ENOMEM; } req = ctdb_transaction_cancel_send(mem_ctx, ev, tevent_timeval_zero(), h); if (req == NULL) { talloc_free(mem_ctx); talloc_free(h); return ENOMEM; } tevent_req_poll(req, ev); status = ctdb_transaction_cancel_recv(req, &ret); if (! status) { talloc_free(mem_ctx); return ret; } talloc_free(mem_ctx); return 0; } /* * TODO: * * In future Samba should register SERVER_ID. * Make that structure same as struct srvid {}. */