diff options
Diffstat (limited to 'source/lib/tdb/common')
-rw-r--r-- | source/lib/tdb/common/lock.c | 147 | ||||
-rw-r--r-- | source/lib/tdb/common/tdb.c | 33 | ||||
-rw-r--r-- | source/lib/tdb/common/tdb_private.h | 3 | ||||
-rw-r--r-- | source/lib/tdb/common/transaction.c | 9 | ||||
-rw-r--r-- | source/lib/tdb/common/traverse.c | 14 |
5 files changed, 174 insertions, 32 deletions
diff --git a/source/lib/tdb/common/lock.c b/source/lib/tdb/common/lock.c index 43be5d20e1e..e3fe888c465 100644 --- a/source/lib/tdb/common/lock.c +++ b/source/lib/tdb/common/lock.c @@ -27,6 +27,8 @@ #include "tdb_private.h" +#define TDB_MARK_LOCK 0x80000000 + void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr) { tdb->interrupt_sig_ptr = ptr; @@ -116,10 +118,13 @@ int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len) /* lock a list in the database. list -1 is the alloc list */ -int tdb_lock(struct tdb_context *tdb, int list, int ltype) +static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op) { struct tdb_lock_type *new_lck; int i; + bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); + + ltype &= ~TDB_MARK_LOCK; /* a global lock allows us to avoid per chain locks */ if (tdb->global_lock.count && @@ -169,10 +174,9 @@ int tdb_lock(struct tdb_context *tdb, int list, int ltype) /* Since fcntl locks don't nest, we do a lock for the first one, and simply bump the count for future ones */ - if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, + if (!mark_lock && + tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op, 0, 1)) { - TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d " - "ltype=%d (%s)\n", list, ltype, strerror(errno))); return -1; } @@ -186,6 +190,25 @@ int tdb_lock(struct tdb_context *tdb, int list, int ltype) return 0; } +/* lock a list in the database. list -1 is the alloc list */ +int tdb_lock(struct tdb_context *tdb, int list, int ltype) +{ + int ret; + ret = _tdb_lock(tdb, list, ltype, F_SETLKW); + if (ret) { + TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d " + "ltype=%d (%s)\n", list, ltype, strerror(errno))); + } + return ret; +} + +/* lock a list in the database. list -1 is the alloc list. non-blocking lock */ +int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype) +{ + return _tdb_lock(tdb, list, ltype, F_SETLK); +} + + /* unlock the database: returns void because it's too late for errors. */ /* changed to return int it may be interesting to know there has been an error --simo */ @@ -194,6 +217,9 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype) int ret = -1; int i; struct tdb_lock_type *lck = NULL; + bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); + + ltype &= ~TDB_MARK_LOCK; /* a global lock allows us to avoid per chain locks */ if (tdb->global_lock.count && @@ -238,8 +264,12 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype) * anyway. */ - ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, - F_SETLKW, 0, 1); + if (mark_lock) { + ret = 0; + } else { + ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, + F_SETLKW, 0, 1); + } tdb->num_locks--; /* @@ -266,11 +296,50 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype) return ret; } +/* + get the transaction lock + */ +int tdb_transaction_lock(struct tdb_context *tdb, int ltype) +{ + if (tdb->have_transaction_lock || tdb->global_lock.count) { + return 0; + } + if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, + F_SETLKW, 0, 1) == -1) { + TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n")); + tdb->ecode = TDB_ERR_LOCK; + return -1; + } + tdb->have_transaction_lock = 1; + return 0; +} + +/* + release the transaction lock + */ +int tdb_transaction_unlock(struct tdb_context *tdb) +{ + int ret; + if (!tdb->have_transaction_lock) { + return 0; + } + ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); + if (ret == 0) { + tdb->have_transaction_lock = 0; + } + return ret; +} + + /* lock/unlock entire database */ -static int _tdb_lockall(struct tdb_context *tdb, int ltype) +static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op) { + bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); + + ltype &= ~TDB_MARK_LOCK; + /* There are no locks on read-only dbs */ if (tdb->read_only || tdb->traverse_read) return TDB_ERRCODE(TDB_ERR_LOCK, -1); @@ -290,9 +359,12 @@ static int _tdb_lockall(struct tdb_context *tdb, int ltype) return TDB_ERRCODE(TDB_ERR_LOCK, -1); } - if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW, + if (!mark_lock && + tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op, 0, 4*tdb->header.hash_size)) { - TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno))); + if (op == F_SETLKW) { + TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno))); + } return -1; } @@ -302,9 +374,15 @@ static int _tdb_lockall(struct tdb_context *tdb, int ltype) return 0; } + + /* unlock entire db */ static int _tdb_unlockall(struct tdb_context *tdb, int ltype) { + bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK); + + ltype &= ~TDB_MARK_LOCK; + /* There are no locks on read-only dbs */ if (tdb->read_only || tdb->traverse_read) { return TDB_ERRCODE(TDB_ERR_LOCK, -1); @@ -319,7 +397,8 @@ static int _tdb_unlockall(struct tdb_context *tdb, int ltype) return 0; } - if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, + if (!mark_lock && + tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size)) { TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno))); return -1; @@ -334,7 +413,25 @@ static int _tdb_unlockall(struct tdb_context *tdb, int ltype) /* lock entire database with write lock */ int tdb_lockall(struct tdb_context *tdb) { - return _tdb_lockall(tdb, F_WRLCK); + return _tdb_lockall(tdb, F_WRLCK, F_SETLKW); +} + +/* lock entire database with write lock - mark only */ +int tdb_lockall_mark(struct tdb_context *tdb) +{ + return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW); +} + +/* unlock entire database with write lock - unmark only */ +int tdb_lockall_unmark(struct tdb_context *tdb) +{ + return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK); +} + +/* lock entire database with write lock - nonblocking varient */ +int tdb_lockall_nonblock(struct tdb_context *tdb) +{ + return _tdb_lockall(tdb, F_WRLCK, F_SETLK); } /* unlock entire database with write lock */ @@ -346,7 +443,13 @@ int tdb_unlockall(struct tdb_context *tdb) /* lock entire database with read lock */ int tdb_lockall_read(struct tdb_context *tdb) { - return _tdb_lockall(tdb, F_RDLCK); + return _tdb_lockall(tdb, F_RDLCK, F_SETLKW); +} + +/* lock entire database with read lock - nonblock varient */ +int tdb_lockall_read_nonblock(struct tdb_context *tdb) +{ + return _tdb_lockall(tdb, F_RDLCK, F_SETLK); } /* unlock entire database with read lock */ @@ -362,6 +465,26 @@ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key) return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK); } +/* lock/unlock one hash chain, non-blocking. This is meant to be used + to reduce contention - it cannot guarantee how many records will be + locked */ +int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key) +{ + return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK); +} + +/* mark a chain as locked without actually locking it. Warning! use with great caution! */ +int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key) +{ + return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK); +} + +/* unmark a chain as locked without actually locking it. Warning! use with great caution! */ +int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key) +{ + return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK); +} + int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) { return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK); diff --git a/source/lib/tdb/common/tdb.c b/source/lib/tdb/common/tdb.c index 8f3c146843b..0e9d1dbd741 100644 --- a/source/lib/tdb/common/tdb.c +++ b/source/lib/tdb/common/tdb.c @@ -30,10 +30,10 @@ TDB_DATA tdb_null; /* - increment the tdb sequence number if the tdb has been opened using + non-blocking increment of the tdb sequence number if the tdb has been opened using the TDB_SEQNUM flag */ -static void tdb_increment_seqnum(struct tdb_context *tdb) +void tdb_increment_seqnum_nonblock(struct tdb_context *tdb) { tdb_off_t seqnum=0; @@ -41,16 +41,29 @@ static void tdb_increment_seqnum(struct tdb_context *tdb) return; } - if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) { - return; - } - /* we ignore errors from this, as we have no sane way of dealing with them. */ tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum); seqnum++; tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum); +} + +/* + increment the tdb sequence number if the tdb has been opened using + the TDB_SEQNUM flag +*/ +static void tdb_increment_seqnum(struct tdb_context *tdb) +{ + if (!(tdb->flags & TDB_SEQNUM)) { + return; + } + + if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) { + return; + } + + tdb_increment_seqnum_nonblock(tdb); tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1); } @@ -653,3 +666,11 @@ int tdb_get_flags(struct tdb_context *tdb) return tdb->flags; } + +/* + enable sequence number handling on an open tdb +*/ +void tdb_enable_seqnum(struct tdb_context *tdb) +{ + tdb->flags |= TDB_SEQNUM; +} diff --git a/source/lib/tdb/common/tdb_private.h b/source/lib/tdb/common/tdb_private.h index daef07aff1c..00bd0eb537a 100644 --- a/source/lib/tdb/common/tdb_private.h +++ b/source/lib/tdb/common/tdb_private.h @@ -162,6 +162,7 @@ struct tdb_context { struct tdb_transaction *transaction; int page_size; int max_dead_records; + bool have_transaction_lock; volatile sig_atomic_t *interrupt_sig_ptr; }; @@ -174,6 +175,8 @@ void tdb_mmap(struct tdb_context *tdb); int tdb_lock(struct tdb_context *tdb, int list, int ltype); int tdb_unlock(struct tdb_context *tdb, int list, int ltype); int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len); +int tdb_transaction_lock(struct tdb_context *tdb, int ltype); +int tdb_transaction_unlock(struct tdb_context *tdb); int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len); int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off); int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off); diff --git a/source/lib/tdb/common/transaction.c b/source/lib/tdb/common/transaction.c index 8263bd7f388..7eaacf7a164 100644 --- a/source/lib/tdb/common/transaction.c +++ b/source/lib/tdb/common/transaction.c @@ -422,9 +422,7 @@ int tdb_transaction_start(struct tdb_context *tdb) /* get the transaction write lock. This is a blocking lock. As discussed with Volker, there are a number of ways we could make this async, which we will probably do in the future */ - if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) { - TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n")); - tdb->ecode = TDB_ERR_LOCK; + if (tdb_transaction_lock(tdb, F_WRLCK) == -1) { SAFE_FREE(tdb->transaction); return -1; } @@ -468,6 +466,7 @@ int tdb_transaction_start(struct tdb_context *tdb) TDB_HASHTABLE_SIZE(tdb)) != 0) { TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n")); tdb->ecode = TDB_ERR_IO; + tdb->methods = tdb->transaction->io_methods; goto fail; } @@ -475,7 +474,7 @@ int tdb_transaction_start(struct tdb_context *tdb) fail: tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0); - tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); + tdb_transaction_unlock(tdb); SAFE_FREE(tdb->transaction->hash_heads); SAFE_FREE(tdb->transaction); return -1; @@ -530,7 +529,7 @@ int tdb_transaction_cancel(struct tdb_context *tdb) tdb->methods = tdb->transaction->io_methods; tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0); - tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); + tdb_transaction_unlock(tdb); SAFE_FREE(tdb->transaction->hash_heads); SAFE_FREE(tdb->transaction); diff --git a/source/lib/tdb/common/traverse.c b/source/lib/tdb/common/traverse.c index 86fa4fd48e6..6fc576a55a3 100644 --- a/source/lib/tdb/common/traverse.c +++ b/source/lib/tdb/common/traverse.c @@ -204,12 +204,10 @@ int tdb_traverse_read(struct tdb_context *tdb, { struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK }; int ret; - + /* we need to get a read lock on the transaction lock here to cope with the lock ordering semantics of solaris10 */ - if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1) { - TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse_read: failed to get transaction lock\n")); - tdb->ecode = TDB_ERR_LOCK; + if (tdb_transaction_lock(tdb, F_RDLCK)) { return -1; } @@ -217,7 +215,7 @@ int tdb_traverse_read(struct tdb_context *tdb, ret = tdb_traverse_internal(tdb, fn, private_data, &tl); tdb->traverse_read--; - tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); + tdb_transaction_unlock(tdb); return ret; } @@ -236,15 +234,13 @@ int tdb_traverse(struct tdb_context *tdb, return tdb_traverse_read(tdb, fn, private_data); } - if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) { - TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse: failed to get transaction lock\n")); - tdb->ecode = TDB_ERR_LOCK; + if (tdb_transaction_lock(tdb, F_WRLCK)) { return -1; } ret = tdb_traverse_internal(tdb, fn, private_data, &tl); - tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1); + tdb_transaction_unlock(tdb); return ret; } |