summaryrefslogtreecommitdiff
path: root/ctdb/server/ctdb_call.c
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/server/ctdb_call.c')
-rw-r--r--ctdb/server/ctdb_call.c128
1 files changed, 127 insertions, 1 deletions
diff --git a/ctdb/server/ctdb_call.c b/ctdb/server/ctdb_call.c
index cbbfc59d633..956b3e87117 100644
--- a/ctdb/server/ctdb_call.c
+++ b/ctdb/server/ctdb_call.c
@@ -409,7 +409,125 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
}
}
+struct dmaster_defer_call {
+ struct dmaster_defer_call *next, *prev;
+ struct ctdb_context *ctdb;
+ struct ctdb_req_header *hdr;
+};
+
+struct dmaster_defer_queue {
+ struct dmaster_defer_call *deferred_calls;
+};
+static void dmaster_defer_reprocess(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval t,
+ void *private_data)
+{
+ struct dmaster_defer_call *call = talloc_get_type(
+ private_data, struct dmaster_defer_call);
+
+ ctdb_input_pkt(call->ctdb, call->hdr);
+ talloc_free(call);
+}
+
+static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
+{
+ while (ddq->deferred_calls != NULL) {
+ struct dmaster_defer_call *call = ddq->deferred_calls;
+
+ DLIST_REMOVE(ddq->deferred_calls, call);
+
+ talloc_steal(call->ctdb, call);
+ tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
+ dmaster_defer_reprocess, call);
+ }
+ return 0;
+}
+
+static void *insert_ddq_callback(void *parm, void *data)
+{
+ if (data) {
+ talloc_free(data);
+ }
+ return parm;
+}
+
+/**
+ * This function is used to reigster a key in database that needs to be updated.
+ * Any requests for that key should get deferred till this is completed.
+ */
+static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
+ struct ctdb_req_header *hdr,
+ TDB_DATA key)
+{
+ uint32_t *k;
+ struct dmaster_defer_queue *ddq;
+
+ k = ctdb_key_to_idkey(hdr, key);
+ if (k == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
+ return -1;
+ }
+
+ /* Already exists */
+ ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+ if (ddq != NULL) {
+ talloc_free(k);
+ return 0;
+ }
+
+ ddq = talloc(hdr, struct dmaster_defer_queue);
+ if (ddq == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
+ talloc_free(k);
+ return -1;
+ }
+ ddq->deferred_calls = NULL;
+
+ trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
+ insert_ddq_callback, ddq);
+ talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
+
+ talloc_free(k);
+ return 0;
+}
+
+static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
+ struct ctdb_req_header *hdr,
+ TDB_DATA key)
+{
+ struct dmaster_defer_queue *ddq;
+ struct dmaster_defer_call *call;
+ uint32_t *k;
+
+ k = ctdb_key_to_idkey(hdr, key);
+ if (k == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
+ return -1;
+ }
+
+ ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+ if (ddq == NULL) {
+ talloc_free(k);
+ return -1;
+ }
+
+ talloc_free(k);
+
+ call = talloc(ddq, struct dmaster_defer_call);
+ if (call == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
+ return -1;
+ }
+
+ call->ctdb = ctdb_db->ctdb;
+ call->hdr = talloc_steal(call, hdr);
+
+ DLIST_ADD_END(ddq->deferred_calls, call, NULL);
+
+ return 0;
+}
/*
called when a CTDB_REQ_DMASTER packet comes in
@@ -445,7 +563,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
c->db_id);
return;
}
-
+
+ dmaster_defer_setup(ctdb_db, hdr, key);
+
/* fetch the current record */
ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
ctdb_call_input_pkt, ctdb, false);
@@ -760,6 +880,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
}
}
+ if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
+ talloc_free(call);
+ return;
+ }
/* determine if we are the dmaster for this key. This also
fetches the record data (if any), thus avoiding a 2nd fetch of the data
@@ -1110,6 +1234,8 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
sizeof(record_flags));
}
+ dmaster_defer_setup(ctdb_db, hdr, key);
+
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
ctdb_call_input_pkt, ctdb, false);
if (ret == -2) {