summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Bartlett <abartlet@samba.org>2017-06-26 14:13:41 +1200
committerStefan Metzmacher <metze@samba.org>2017-07-02 17:35:20 +0200
commitd21d83219534782c5d75a707aab23a5d7e91ce56 (patch)
tree1974c97f883b2f7d9e85908e3bd1290412273b14
parent4894f47e2ec48dd36cac72608bfabde6fdfc7c6b (diff)
downloadsamba-d21d83219534782c5d75a707aab23a5d7e91ce56.tar.gz
dsdb: Teach the Samba partition module how to lock all the DB backends
The metadata partition (sam.ldb) lock is not enough to block another process in prepare_commit(), because prepare_commit() is a no-op, if nothing was changed in the specific backend. Signed-off-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Stefan Metzmacher <metze@samba.org>
-rw-r--r--selftest/knownfail.d/ldb-locking5
-rw-r--r--source4/dsdb/samdb/ldb_modules/partition.c176
2 files changed, 175 insertions, 6 deletions
diff --git a/selftest/knownfail.d/ldb-locking b/selftest/knownfail.d/ldb-locking
deleted file mode 100644
index 532bf82b0ad..00000000000
--- a/selftest/knownfail.d/ldb-locking
+++ /dev/null
@@ -1,5 +0,0 @@
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock1\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock1_config\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock2\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_full_db_lock2_config\(ad_dc_ntvfs:local\)
-samba.tests.dsdb.samba.tests.dsdb.DsdbTests.test_db_lock2\(ad_dc_ntvfs:local\)
diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c
index 08a959cabb5..c304efa645d 100644
--- a/source4/dsdb/samdb/ldb_modules/partition.c
+++ b/source4/dsdb/samdb/ldb_modules/partition.c
@@ -814,7 +814,7 @@ static int partition_start_trans(struct ldb_module *module)
ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
}
- /* This order must match that in prepare_commit() */
+ /* This order must match that in prepare_commit() and read_lock() */
ret = ldb_next_start_trans(module);
if (ret != LDB_SUCCESS) {
return ret;
@@ -1144,6 +1144,178 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque
return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
}
+/* lock all the backends */
+static int partition_read_lock(struct ldb_module *module)
+{
+ int i;
+ int ret;
+ int ret2;
+ struct ldb_context *ldb = ldb_module_get_ctx(module);
+ struct partition_private_data *data = \
+ talloc_get_type(ldb_module_get_private(module),
+ struct partition_private_data);
+
+ if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
+ ldb_debug(ldb, LDB_DEBUG_TRACE,
+ "partition_read_lock() -> (metadata partition)");
+ }
+
+ /*
+ * It is important to only do this for LOCK because:
+ * - we don't want to unlock what we did not lock
+ *
+ * - we don't want to make a new lock on the sam.ldb
+ * (triggered inside this routine due to the seq num check)
+ * during an unlock phase as that will violate the lock
+ * ordering
+ */
+
+ /*
+ * This will lock the metadata partition (sam.ldb) and
+ * will also call event loops, so we do it before we
+ * get the whole db lock.
+ */
+ ret = partition_reload_if_required(module, data, NULL);
+ if (ret != LDB_SUCCESS) {
+ return ret;
+ }
+
+ /*
+ * This order must match that in prepare_commit(), start with
+ * the metadata partition (sam.ldb) lock
+ */
+ ret = ldb_next_read_lock(module);
+ if (ret != LDB_SUCCESS) {
+ ldb_debug_set(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to lock db: %s / %s for metadata partition",
+ ldb_errstring(ldb),
+ ldb_strerror(ret));
+
+ return ret;
+ }
+
+ /*
+ * The metadata partition (sam.ldb) lock is not
+ * enough to block another process in prepare_commit(),
+ * because prepare_commit() is a no-op, if nothing
+ * was changed in the specific backend.
+ *
+ * That means the following per partition locks are required.
+ */
+ for (i=0; data && data->partitions && data->partitions[i]; i++) {
+ if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
+ ldb_debug(ldb, LDB_DEBUG_TRACE,
+ "partition_read_lock() -> %s",
+ ldb_dn_get_linearized(
+ data->partitions[i]->ctrl->dn));
+ }
+ ret = ldb_next_read_lock(data->partitions[i]->module);
+ if (ret == LDB_SUCCESS) {
+ continue;
+ }
+
+ ldb_debug_set(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to lock db: %s / %s for %s",
+ ldb_errstring(ldb),
+ ldb_strerror(ret),
+ ldb_dn_get_linearized(
+ data->partitions[i]->ctrl->dn));
+
+ /* Back it out, if it fails on one */
+ for (i--; i >= 0; i--) {
+ ret2 = ldb_next_read_unlock(data->partitions[i]->module);
+ if (ret2 != LDB_SUCCESS) {
+ ldb_debug(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to unlock db: %s / %s",
+ ldb_errstring(ldb),
+ ldb_strerror(ret2));
+ }
+ }
+ ret2 = ldb_next_read_unlock(module);
+ if (ret2 != LDB_SUCCESS) {
+ ldb_debug(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to unlock db: %s / %s",
+ ldb_errstring(ldb),
+ ldb_strerror(ret2));
+ }
+ return ret;
+ }
+
+ return LDB_SUCCESS;
+}
+
+/* unlock all the backends */
+static int partition_read_unlock(struct ldb_module *module)
+{
+ int i;
+ int ret = LDB_SUCCESS;
+ int ret2;
+ struct ldb_context *ldb = ldb_module_get_ctx(module);
+ struct partition_private_data *data = \
+ talloc_get_type(ldb_module_get_private(module),
+ struct partition_private_data);
+
+ /*
+ * This order must be similar to partition_{end,del}_trans()
+ * the metadata partition (sam.ldb) unlock must be at the end.
+ */
+
+ for (i=0; data && data->partitions && data->partitions[i]; i++) {
+ if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
+ ldb_debug(ldb, LDB_DEBUG_TRACE,
+ "partition_read_unlock() -> %s",
+ ldb_dn_get_linearized(
+ data->partitions[i]->ctrl->dn));
+ }
+ ret2 = ldb_next_read_unlock(data->partitions[i]->module);
+ if (ret2 != LDB_SUCCESS) {
+ ldb_debug_set(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to lock db: %s / %s for %s",
+ ldb_errstring(ldb),
+ ldb_strerror(ret),
+ ldb_dn_get_linearized(
+ data->partitions[i]->ctrl->dn));
+
+ /*
+ * Don't overwrite the original failure code
+ * if there was one
+ */
+ if (ret == LDB_SUCCESS) {
+ ret = ret2;
+ }
+ }
+ }
+
+ if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
+ ldb_debug(ldb, LDB_DEBUG_TRACE,
+ "partition_read_unlock() -> (metadata partition)");
+ }
+
+ ret2 = ldb_next_read_unlock(module);
+ if (ret2 != LDB_SUCCESS) {
+ ldb_debug_set(ldb,
+ LDB_DEBUG_FATAL,
+ "Failed to unlock db: %s / %s for metadata partition",
+ ldb_errstring(ldb),
+ ldb_strerror(ret2));
+
+ /*
+ * Don't overwrite the original failure code
+ * if there was one
+ */
+ if (ret == LDB_SUCCESS) {
+ ret = ret2;
+ }
+ }
+
+ return ret;
+}
+
/* extended */
static int partition_extended(struct ldb_module *module, struct ldb_request *req)
{
@@ -1198,6 +1370,8 @@ static const struct ldb_module_ops ldb_partition_module_ops = {
.prepare_commit = partition_prepare_commit,
.end_transaction = partition_end_trans,
.del_transaction = partition_del_trans,
+ .read_lock = partition_read_lock,
+ .read_unlock = partition_read_unlock
};
int ldb_partition_module_init(const char *version)