summaryrefslogtreecommitdiff
path: root/ctdb
diff options
context:
space:
mode:
authorAmitay Isaacs <amitay@gmail.com>2018-02-16 17:00:40 +1100
committerAmitay Isaacs <amitay@samba.org>2019-10-24 04:06:42 +0000
commit86521837b684df3b7c5a0a1e3b7e606c8b91f63e (patch)
tree8a90651a841cba630bf0e9acc16ed4fa7f6b8f6a /ctdb
parentda617f90d90151f955ee354c57bdc4bc6f6498f2 (diff)
downloadsamba-86521837b684df3b7c5a0a1e3b7e606c8b91f63e.tar.gz
ctdb-vacuum: Add processing of fetch queue
Signed-off-by: Amitay Isaacs <amitay@gmail.com> Reviewed-by: Martin Schwenke <martin@meltin.net>
Diffstat (limited to 'ctdb')
-rw-r--r--ctdb/server/ctdb_vacuum.c192
1 files changed, 189 insertions, 3 deletions
diff --git a/ctdb/server/ctdb_vacuum.c b/ctdb/server/ctdb_vacuum.c
index 09762c49795..bdb00c3df0b 100644
--- a/ctdb/server/ctdb_vacuum.c
+++ b/ctdb/server/ctdb_vacuum.c
@@ -317,6 +317,181 @@ static int delete_marshall_traverse(void *param, void *data)
return 0;
}
+struct fetch_queue_state {
+ struct ctdb_db_context *ctdb_db;
+ int count;
+};
+
+struct fetch_record_migrate_state {
+ struct fetch_queue_state *fetch_queue;
+ TDB_DATA key;
+};
+
+static void fetch_record_migrate_callback(struct ctdb_client_call_state *state)
+{
+ struct fetch_record_migrate_state *fetch = talloc_get_type_abort(
+ state->async.private_data, struct fetch_record_migrate_state);
+ struct fetch_queue_state *fetch_queue = fetch->fetch_queue;
+ struct ctdb_ltdb_header hdr;
+ struct ctdb_call call = { 0 };
+ int ret;
+
+ ret = ctdb_call_recv(state, &call);
+ fetch_queue->count--;
+ if (ret != 0) {
+ D_ERR("Failed to migrate record for vacuuming\n");
+ goto done;
+ }
+
+ ret = tdb_chainlock_nonblock(fetch_queue->ctdb_db->ltdb->tdb,
+ fetch->key);
+ if (ret != 0) {
+ goto done;
+ }
+
+ ret = tdb_parse_record(fetch_queue->ctdb_db->ltdb->tdb,
+ fetch->key,
+ vacuum_record_parser,
+ &hdr);
+
+ tdb_chainunlock(fetch_queue->ctdb_db->ltdb->tdb, fetch->key);
+
+ if (ret != 0) {
+ goto done;
+ }
+
+ D_INFO("Vacuum Fetch record, key=%.*s\n",
+ (int)fetch->key.dsize,
+ fetch->key.dptr);
+
+ (void) ctdb_local_schedule_for_deletion(fetch_queue->ctdb_db,
+ &hdr,
+ fetch->key);
+
+done:
+ talloc_free(fetch);
+}
+
+static int fetch_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
+{
+ struct ctdb_ltdb_header *header =
+ (struct ctdb_ltdb_header *)private_data;
+
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ return -1;
+ }
+
+ memcpy(header, data.dptr, sizeof(*header));
+ return 0;
+}
+
+/**
+ * traverse function for the traversal of the fetch_queue.
+ *
+ * Send a record migration request.
+ */
+static int fetch_queue_traverse(void *param, void *data)
+{
+ struct fetch_record_data *rd = talloc_get_type_abort(
+ data, struct fetch_record_data);
+ struct fetch_queue_state *fetch_queue =
+ (struct fetch_queue_state *)param;
+ struct ctdb_db_context *ctdb_db = fetch_queue->ctdb_db;
+ struct ctdb_client_call_state *state;
+ struct fetch_record_migrate_state *fetch;
+ struct ctdb_call call = { 0 };
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ ret = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, rd->key);
+ if (ret != 0) {
+ return 0;
+ }
+
+ ret = tdb_parse_record(ctdb_db->ltdb->tdb,
+ rd->key,
+ fetch_record_parser,
+ &header);
+
+ tdb_chainunlock(ctdb_db->ltdb->tdb, rd->key);
+
+ if (ret != 0) {
+ goto skipped;
+ }
+
+ if (header.dmaster == ctdb_db->ctdb->pnn) {
+ /* If the record is already migrated, skip */
+ goto skipped;
+ }
+
+ fetch = talloc_zero(ctdb_db, struct fetch_record_migrate_state);
+ if (fetch == NULL) {
+ D_ERR("Failed to setup fetch record migrate state\n");
+ return 0;
+ }
+
+ fetch->fetch_queue = fetch_queue;
+
+ fetch->key.dsize = rd->key.dsize;
+ fetch->key.dptr = talloc_memdup(fetch, rd->key.dptr, rd->key.dsize);
+ if (fetch->key.dptr == NULL) {
+ D_ERR("Memory error in fetch_queue_traverse\n");
+ talloc_free(fetch);
+ return 0;
+ }
+
+ call.call_id = CTDB_NULL_FUNC;
+ call.flags = CTDB_IMMEDIATE_MIGRATION |
+ CTDB_CALL_FLAG_VACUUM_MIGRATION;
+ call.key = fetch->key;
+
+ state = ctdb_call_send(ctdb_db, &call);
+ if (state == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to setup vacuum fetch call\n"));
+ talloc_free(fetch);
+ return 0;
+ }
+
+ state->async.fn = fetch_record_migrate_callback;
+ state->async.private_data = fetch;
+
+ fetch_queue->count++;
+
+ return 0;
+
+skipped:
+ D_INFO("Skipped Fetch record, key=%.*s\n",
+ (int)rd->key.dsize,
+ rd->key.dptr);
+ return 0;
+}
+
+/**
+ * Traverse the fetch.
+ * Records are migrated to the local node and
+ * added to delete queue for further processing.
+ */
+static void ctdb_process_fetch_queue(struct ctdb_db_context *ctdb_db)
+{
+ struct fetch_queue_state state;
+ int ret;
+
+ state.ctdb_db = ctdb_db;
+ state.count = 0;
+
+ ret = trbt_traversearray32(ctdb_db->fetch_queue, 1,
+ fetch_queue_traverse, &state);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " Error traversing "
+ "the fetch queue.\n"));
+ }
+
+ /* Wait for all migrations to complete */
+ while (state.count > 0) {
+ tevent_loop_once(ctdb_db->ctdb->ev);
+ }
+}
+
/**
* traverse function for the traversal of the delete_queue,
* the fast-path vacuuming list.
@@ -998,8 +1173,10 @@ fail:
/**
* Vacuum a DB:
* - Always do the fast vacuuming run, which traverses
- * the in-memory delete queue: these records have been
- * scheduled for deletion.
+ * - the in-memory fetch queue: these records have been
+ * scheduled for migration
+ * - the in-memory delete queue: these records have been
+ * scheduled for deletion.
* - Only if explicitly requested, the database is traversed
* in order to use the traditional heuristics on empty records
* to trigger deletion.
@@ -1070,6 +1247,8 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
ctdb_vacuum_traverse_db(ctdb_db, vdata);
}
+ ctdb_process_fetch_queue(ctdb_db);
+
ctdb_process_delete_queue(ctdb_db, vdata);
ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
@@ -1310,10 +1489,17 @@ static void ctdb_vacuum_event(struct tevent_context *ev,
ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
if (ctdb_db->delete_queue == NULL) {
/* fatal here? ... */
- ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
+ ctdb_fatal(ctdb, "Out of memory when re-creating delete queue "
"in parent context. Shutting down\n");
}
+ talloc_free(ctdb_db->fetch_queue);
+ ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
+ if (ctdb_db->fetch_queue == NULL) {
+ ctdb_fatal(ctdb, "Out of memory when re-create fetch queue "
+ " in parent context. Shutting down\n");
+ }
+
tevent_add_timer(ctdb->ev, child_ctx,
timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
vacuum_child_timeout, child_ctx);