diff options
author | Amitay Isaacs <amitay@gmail.com> | 2018-02-16 17:00:40 +1100 |
---|---|---|
committer | Karolin Seeger <kseeger@samba.org> | 2020-03-30 10:08:23 +0000 |
commit | 190b34ff2cfa4f529d1e37bbc995359fc4abac7d (patch) | |
tree | 33cfe083a2f1da113105ea2063aa4a24a63f0e69 /ctdb | |
parent | ee045a05acb5a2024a280469ea95c860bff8a5c9 (diff) | |
download | samba-190b34ff2cfa4f529d1e37bbc995359fc4abac7d.tar.gz |
ctdb-vacuum: Add processing of fetch queue
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
(cherry picked from commit 86521837b684df3b7c5a0a1e3b7e606c8b91f63e)
Diffstat (limited to 'ctdb')
-rw-r--r-- | ctdb/server/ctdb_vacuum.c | 192 |
1 files changed, 189 insertions, 3 deletions
diff --git a/ctdb/server/ctdb_vacuum.c b/ctdb/server/ctdb_vacuum.c index 6f28fa89cc9..410ef8bf722 100644 --- a/ctdb/server/ctdb_vacuum.c +++ b/ctdb/server/ctdb_vacuum.c @@ -317,6 +317,181 @@ static int delete_marshall_traverse(void *param, void *data) return 0; } +struct fetch_queue_state { + struct ctdb_db_context *ctdb_db; + int count; +}; + +struct fetch_record_migrate_state { + struct fetch_queue_state *fetch_queue; + TDB_DATA key; +}; + +static void fetch_record_migrate_callback(struct ctdb_client_call_state *state) +{ + struct fetch_record_migrate_state *fetch = talloc_get_type_abort( + state->async.private_data, struct fetch_record_migrate_state); + struct fetch_queue_state *fetch_queue = fetch->fetch_queue; + struct ctdb_ltdb_header hdr; + struct ctdb_call call = { 0 }; + int ret; + + ret = ctdb_call_recv(state, &call); + fetch_queue->count--; + if (ret != 0) { + D_ERR("Failed to migrate record for vacuuming\n"); + goto done; + } + + ret = tdb_chainlock_nonblock(fetch_queue->ctdb_db->ltdb->tdb, + fetch->key); + if (ret != 0) { + goto done; + } + + ret = tdb_parse_record(fetch_queue->ctdb_db->ltdb->tdb, + fetch->key, + vacuum_record_parser, + &hdr); + + tdb_chainunlock(fetch_queue->ctdb_db->ltdb->tdb, fetch->key); + + if (ret != 0) { + goto done; + } + + D_INFO("Vacuum Fetch record, key=%.*s\n", + (int)fetch->key.dsize, + fetch->key.dptr); + + (void) ctdb_local_schedule_for_deletion(fetch_queue->ctdb_db, + &hdr, + fetch->key); + +done: + talloc_free(fetch); +} + +static int fetch_record_parser(TDB_DATA key, TDB_DATA data, void *private_data) +{ + struct ctdb_ltdb_header *header = + (struct ctdb_ltdb_header *)private_data; + + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + return -1; + } + + memcpy(header, data.dptr, sizeof(*header)); + return 0; +} + +/** + * traverse function for the traversal of the fetch_queue. + * + * Send a record migration request. + */ +static int fetch_queue_traverse(void *param, void *data) +{ + struct fetch_record_data *rd = talloc_get_type_abort( + data, struct fetch_record_data); + struct fetch_queue_state *fetch_queue = + (struct fetch_queue_state *)param; + struct ctdb_db_context *ctdb_db = fetch_queue->ctdb_db; + struct ctdb_client_call_state *state; + struct fetch_record_migrate_state *fetch; + struct ctdb_call call = { 0 }; + struct ctdb_ltdb_header header; + int ret; + + ret = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, rd->key); + if (ret != 0) { + return 0; + } + + ret = tdb_parse_record(ctdb_db->ltdb->tdb, + rd->key, + fetch_record_parser, + &header); + + tdb_chainunlock(ctdb_db->ltdb->tdb, rd->key); + + if (ret != 0) { + goto skipped; + } + + if (header.dmaster == ctdb_db->ctdb->pnn) { + /* If the record is already migrated, skip */ + goto skipped; + } + + fetch = talloc_zero(ctdb_db, struct fetch_record_migrate_state); + if (fetch == NULL) { + D_ERR("Failed to setup fetch record migrate state\n"); + return 0; + } + + fetch->fetch_queue = fetch_queue; + + fetch->key.dsize = rd->key.dsize; + fetch->key.dptr = talloc_memdup(fetch, rd->key.dptr, rd->key.dsize); + if (fetch->key.dptr == NULL) { + D_ERR("Memory error in fetch_queue_traverse\n"); + talloc_free(fetch); + return 0; + } + + call.call_id = CTDB_NULL_FUNC; + call.flags = CTDB_IMMEDIATE_MIGRATION | + CTDB_CALL_FLAG_VACUUM_MIGRATION; + call.key = fetch->key; + + state = ctdb_call_send(ctdb_db, &call); + if (state == NULL) { + DEBUG(DEBUG_ERR, ("Failed to setup vacuum fetch call\n")); + talloc_free(fetch); + return 0; + } + + state->async.fn = fetch_record_migrate_callback; + state->async.private_data = fetch; + + fetch_queue->count++; + + return 0; + +skipped: + D_INFO("Skipped Fetch record, key=%.*s\n", + (int)rd->key.dsize, + rd->key.dptr); + return 0; +} + +/** + * Traverse the fetch. + * Records are migrated to the local node and + * added to delete queue for further processing. + */ +static void ctdb_process_fetch_queue(struct ctdb_db_context *ctdb_db) +{ + struct fetch_queue_state state; + int ret; + + state.ctdb_db = ctdb_db; + state.count = 0; + + ret = trbt_traversearray32(ctdb_db->fetch_queue, 1, + fetch_queue_traverse, &state); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " Error traversing " + "the fetch queue.\n")); + } + + /* Wait for all migrations to complete */ + while (state.count > 0) { + tevent_loop_once(ctdb_db->ctdb->ev); + } +} + /** * traverse function for the traversal of the delete_queue, * the fast-path vacuuming list. @@ -998,8 +1173,10 @@ fail: /** * Vacuum a DB: * - Always do the fast vacuuming run, which traverses - * the in-memory delete queue: these records have been - * scheduled for deletion. + * - the in-memory fetch queue: these records have been + * scheduled for migration + * - the in-memory delete queue: these records have been + * scheduled for deletion. * - Only if explicitly requested, the database is traversed * in order to use the traditional heuristics on empty records * to trigger deletion. @@ -1070,6 +1247,8 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, ctdb_vacuum_traverse_db(ctdb_db, vdata); } + ctdb_process_fetch_queue(ctdb_db); + ctdb_process_delete_queue(ctdb_db, vdata); ctdb_process_vacuum_fetch_lists(ctdb_db, vdata); @@ -1309,10 +1488,17 @@ static void ctdb_vacuum_event(struct tevent_context *ev, ctdb_db->delete_queue = trbt_create(ctdb_db, 0); if (ctdb_db->delete_queue == NULL) { /* fatal here? ... */ - ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree " + ctdb_fatal(ctdb, "Out of memory when re-creating delete queue " "in parent context. Shutting down\n"); } + talloc_free(ctdb_db->fetch_queue); + ctdb_db->fetch_queue = trbt_create(ctdb_db, 0); + if (ctdb_db->fetch_queue == NULL) { + ctdb_fatal(ctdb, "Out of memory when re-create fetch queue " + " in parent context. Shutting down\n"); + } + tevent_add_timer(ctdb->ev, child_ctx, timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0), vacuum_child_timeout, child_ctx); |