summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarc G. Fournier <scrappy@hub.org>1996-10-11 03:20:52 +0000
committerMarc G. Fournier <scrappy@hub.org>1996-10-11 03:20:52 +0000
commit73010f5044f24d91f7d625714d17b62bbd494b53 (patch)
tree94199547871851c8eb63f093aedfb3beec61bda7
parent3208b4d7d88a5a645b7ba644d806aa3926227466 (diff)
downloadpostgresql-73010f5044f24d91f7d625714d17b62bbd494b53.tar.gz
I have written some patches to the postgres lock manager which allow the
use of long term cooperative locks managed by the user applications. Submitted by: Massimo Dal Zotto <dz@cs.unitn.it>
-rw-r--r--src/backend/storage/lmgr/lock.c306
-rw-r--r--src/backend/storage/lmgr/proc.c8
2 files changed, 311 insertions, 3 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 9e618f0dbe..c3290c941f 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.2 1996/07/30 07:47:33 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.2.2.1 1996/10/11 03:20:48 scrappy Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -366,6 +366,48 @@ LockTabRename(LockTableId tableId)
* Side Effects: The lock is always acquired. No way to abort
* a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain.
+#ifdef USER_LOCKS
+ * Note on User Locks:
+ * User locks are handled totally on the application side as
+ * long term cooperative locks which extend beyond the normal
+ * transaction boundaries. Their purpose is to indicate to an
+ * application that someone is `working' on an item. So it is
+ * possible to put an user lock on a tuple's oid, retrieve the
+ * tuple, work on it for an hour and then update it and remove
+ * the lock. While the lock is active other clients can still
+ * read and write the tuple but they can be aware that it has
+ * been locked at the application level by someone.
+ * User locks use lock tags made of an uint16 and an uint32, for
+ * example 0 and a tuple oid, or any other arbitrary pair of
+ * numbers following a convention established by the application.
+ * In this sense tags don't refer to tuples or database entities.
+ * User locks and normal locks are completely orthogonal and
+ * they don't interfere with each other, so it is possible
+ * to acquire a normal lock on an user-locked tuple or user-lock
+ * a tuple for which a normal write lock already exists.
+ * User locks are always non blocking, therefore they are never
+ * acquired if already held by another process. They must be
+ * released explicitly by the application but they are released
+ * automatically when a backend terminates.
+ * They are indicated by a dummy tableId 0 which doesn't have
+ * any table allocated but uses the normal lock table, and are
+ * distinguished from normal locks for the following differences:
+ *
+ * normal lock user lock
+ *
+ * tableId 1 0
+ * tag.relId rel oid 0
+ * tag.ItemPointerData.ip_blkid block id lock id2
+ * tag.ItemPointerData.ip_posid tuple offset lock id1
+ * xid.pid 0 backend pid
+ * xid.xid current xid 0
+ * persistence transaction user or backend
+ *
+ * The lockt parameter can have the same values for normal locks
+ * although probably only WRITE_LOCK can have some practical use.
+ *
+ * DZ - 4 Oct 1996
+#endif
*/
bool
LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
@@ -379,6 +421,22 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
int status;
TransactionId myXid;
+#ifdef USER_LOCKS
+ int is_user_lock;
+
+ is_user_lock = (tableId == 0);
+ if (is_user_lock) {
+ tableId = 1;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockAcquire: user lock tag [%u,%u] %d",
+ lockName->tupleId.ip_posid,
+ ((lockName->tupleId.ip_blkid.bi_hi<<16)+
+ lockName->tupleId.ip_blkid.bi_lo),
+ lockt);
+#endif
+ }
+#endif
+
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable)
@@ -445,6 +503,17 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
item.tag.pid = MyPid;
#endif
+#ifdef USER_LOCKS
+ if (is_user_lock) {
+ item.tag.pid = getpid();
+ item.tag.xid = myXid = 0;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockAcquire: user lock xid [%d,%d,%d]",
+ item.tag.lock, item.tag.pid, item.tag.xid);
+#endif
+ }
+#endif
+
result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
if (!result)
{
@@ -494,6 +563,25 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
}
else if (status == STATUS_FOUND)
{
+#ifdef USER_LOCKS
+ /*
+ * User locks are non blocking. If we can't acquire a lock
+ * remove the xid entry and return FALSE without waiting.
+ */
+ if (is_user_lock) {
+ if (!result->nHolding) {
+ SHMQueueDelete(&result->queue);
+ hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found);
+ }
+ lock->nHolding--;
+ lock->holders[lockt]--;
+ SpinRelease(masterLock);
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockAcquire: user lock failed");
+#endif
+ return(FALSE);
+ }
+#endif
status = WaitOnLock(ltable, tableId, lock, lockt);
XID_PRINT("Someone granted me the lock", result);
}
@@ -690,6 +778,22 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
HTAB *xidTable;
bool wakeupNeeded = true;
+#ifdef USER_LOCKS
+ int is_user_lock;
+
+ is_user_lock = (tableId == 0);
+ if (is_user_lock) {
+ tableId = 1;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockRelease: user lock tag [%u,%u] %d",
+ lockName->tupleId.ip_posid,
+ ((lockName->tupleId.ip_blkid.bi_hi<<16)+
+ lockName->tupleId.ip_blkid.bi_lo),
+ lockt);
+#endif
+ }
+#endif
+
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable) {
@@ -713,6 +817,18 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
lock = (LOCK *)
hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);
+#ifdef USER_LOCKS
+ /*
+ * If the entry is not found hash_search returns TRUE
+ * instead of NULL, so we must check it explicitly.
+ */
+ if ((is_user_lock) && (lock == (LOCK *)TRUE)) {
+ SpinRelease(masterLock);
+ elog(NOTICE,"LockRelease: there are no locks with this tag");
+ return(FALSE);
+ }
+#endif
+
/* let the caller print its own error message, too.
* Do not elog(WARN).
*/
@@ -732,6 +848,14 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
Assert(lock->nHolding > 0);
+#ifdef USER_LOCKS
+ /*
+ * If this is an user lock it can be removed only after
+ * checking that it was acquired by the current process,
+ * so this code is skipped and executed later.
+ */
+ if (!is_user_lock) {
+#endif
/*
* fix the general lock stats
*/
@@ -758,6 +882,9 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
Assert(lock && found);
wakeupNeeded = false;
}
+#ifdef USER_LOCKS
+ }
+#endif
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
@@ -772,6 +899,17 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
item.tag.pid = MyPid;
#endif
+#ifdef USER_LOCKS
+ if (is_user_lock) {
+ item.tag.pid = getpid();
+ item.tag.xid = 0;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockRelease: user lock xid [%d,%d,%d]",
+ item.tag.lock, item.tag.pid, item.tag.xid);
+#endif
+ }
+#endif
+
if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
(Pointer)&item,
HASH_FIND_SAVE,
@@ -779,7 +917,15 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
|| !found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ if ((is_user_lock) && (result)) {
+ elog(NOTICE,"LockRelease: you don't have a lock on this tag");
+ } else {
+ elog(NOTICE,"LockRelease: find xid, table corrupted");
+ }
+#else
elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
return(FALSE);
}
/*
@@ -797,6 +943,14 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
*/
if (! result->nHolding)
{
+#ifdef USER_LOCKS
+ if (result->queue.prev == INVALID_OFFSET) {
+ elog(NOTICE,"LockRelease: xid.prev == INVALID_OFFSET");
+ }
+ if (result->queue.next == INVALID_OFFSET) {
+ elog(NOTICE,"LockRelease: xid.next == INVALID_OFFSET");
+ }
+#endif
if (result->queue.next != INVALID_OFFSET)
SHMQueueDelete(&result->queue);
if (! (result = (XIDLookupEnt *)
@@ -804,11 +958,50 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
! found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ elog(NOTICE,"LockRelease: remove xid, table corrupted");
+#else
elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
return(FALSE);
}
}
+#ifdef USER_LOCKS
+ /*
+ * If this is an user lock remove it now, after the
+ * corresponding xid entry has been found and deleted.
+ */
+ if (is_user_lock) {
+ /*
+ * fix the general lock stats
+ */
+ lock->nHolding--;
+ lock->holders[lockt]--;
+ lock->nActive--;
+ lock->activeHolders[lockt]--;
+
+ Assert(lock->nActive >= 0);
+
+ if (! lock->nHolding)
+ {
+ /* ------------------
+ * if there's no one waiting in the queue,
+ * we just released the last lock.
+ * Delete it from the lock table.
+ * ------------------
+ */
+ Assert( ltable->lockHash->hash == tag_hash);
+ lock = (LOCK *) hash_search(ltable->lockHash,
+ (Pointer) &(lock->tag),
+ HASH_REMOVE,
+ &found);
+ Assert(lock && found);
+ wakeupNeeded = false;
+ }
+ }
+#endif
+
/* --------------------------
* If there are still active locks of the type I just released, no one
* should be woken up. Whoever is asleep will still conflict
@@ -848,6 +1041,18 @@ GrantLock(LOCK *lock, LOCKT lockt)
lock->mask |= BITS_ON[lockt];
}
+#ifdef USER_LOCKS
+/*
+ * LockReleaseAll -- Release all locks in a process lock queue.
+ *
+ * Note: This code is a little complicated by the presence in the
+ * same queue of user locks which can't be removed from the
+ * normal lock queue at the end of a transaction. They must
+ * however be removed when the backend exits.
+ * A dummy tableId 0 is used to indicate that we are releasing
+ * the user locks, from the code added to ProcKill().
+ */
+#endif
bool
LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
{
@@ -862,6 +1067,19 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
LOCK *lock;
bool found;
+#ifdef USER_LOCKS
+ int is_user_lock_table, my_pid, count, nskip;
+
+ is_user_lock_table = (tableId == 0);
+ my_pid = getpid();
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockReleaseAll: tableId=%d, pid=%d", tableId, my_pid);
+#endif
+ if (is_user_lock_table) {
+ tableId = 1;
+ }
+#endif
+
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable)
@@ -873,11 +1091,18 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
if (SHMQueueEmpty(lockQueue))
return TRUE;
+#ifdef USER_LOCKS
+ SpinAcquire(masterLock);
+#endif
SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue);
XID_PRINT("LockReleaseAll:", xidLook);
+#ifndef USER_LOCKS
SpinAcquire(masterLock);
+#else
+ count = nskip = 0;
+#endif
for (;;)
{
/* ---------------------------
@@ -891,6 +1116,65 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
LOCK_PRINT("ReleaseAll",(&lock->tag),0);
+#ifdef USER_LOCKS
+ /*
+ * Sometimes the queue appears to be messed up.
+ */
+ if (count++ > 2000) {
+ elog(NOTICE,"LockReleaseAll: xid loop detected, giving up");
+ nskip = 0;
+ break;
+ }
+ if (is_user_lock_table) {
+ if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) {
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockReleaseAll: skip normal lock [%d,%d,%d]",
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ nskip++;
+ goto next_item;
+ }
+ if (xidLook->tag.pid != my_pid) {
+ /* This should never happen */
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,
+ "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ nskip++;
+ goto next_item;
+ }
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,
+ "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ } else {
+ if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) {
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,
+ "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ nskip++;
+ goto next_item;
+ }
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockReleaseAll: release normal lock [%d,%d,%d]",
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ }
+#endif
+
/* ------------------
* fix the general lock stats
* ------------------
@@ -921,11 +1205,18 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
* always remove the xidLookup entry, we're done with it now
* ----------------
*/
+#ifdef USER_LOCKS
+ SHMQueueDelete(&xidLook->queue);
+#endif
if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
|| !found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ elog(NOTICE,"LockReleaseAll: xid table corrupted");
+#else
elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
return(FALSE);
}
@@ -943,7 +1234,11 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
if ((! lock) || (!found))
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ elog(NOTICE,"LockReleaseAll: cannot remove lock from HTAB");
+#else
elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
+#endif
return(FALSE);
}
}
@@ -959,12 +1254,21 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
(void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
}
+#ifdef USER_LOCKS
+ next_item:
+#endif
if (done)
break;
SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue);
xidLook = tmp;
}
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ /*
+ * Reinitialize the queue only if nothing has been left in.
+ */
+ if (nskip == 0)
+#endif
SHMQueueInit(lockQueue);
return TRUE;
}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index edba19c319..9bfab675fe 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5 1996/08/01 05:11:33 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5.2.1 1996/10/11 03:20:52 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5 1996/08/01 05:11:33 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5.2.1 1996/10/11 03:20:52 scrappy Exp $
*/
#include <sys/time.h>
#ifndef WIN32
@@ -361,6 +361,10 @@ ProcKill(int exitStatus, int pid)
ProcReleaseSpins(proc);
LockReleaseAll(1,&proc->lockQueue);
+#ifdef USER_LOCKS
+ LockReleaseAll(0,&proc->lockQueue);
+#endif
+
/* ----------------
* get off the wait queue
* ----------------