summaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr
diff options
context:
space:
mode:
authorMarc G. Fournier <scrappy@hub.org>1998-08-25 21:20:32 +0000
committerMarc G. Fournier <scrappy@hub.org>1998-08-25 21:20:32 +0000
commit7dbcf31be29d2f4b57dbbb2c57a3c783e3595ac6 (patch)
tree0d3683569b87913a87d6d4d11eec5b6ee8c7eed2 /src/backend/storage/lmgr
parent1acf0d85fefca64737fa05f99d7b6dd44d7bc783 (diff)
downloadpostgresql-7dbcf31be29d2f4b57dbbb2c57a3c783e3595ac6.tar.gz
From: Massimo Dal Zotto <dz@cs.unitn.it>
lock.patch I have rewritten lock.c cleaning up the code and adding better assert checking I have also added some fields to the lock and xid tags for better support of user locks. There is also a new function which returns an array of pids owning a lock. I'm using this code from over six months and it works fine.
Diffstat (limited to 'src/backend/storage/lmgr')
-rw-r--r--src/backend/storage/lmgr/lock.c1267
-rw-r--r--src/backend/storage/lmgr/multi.c28
-rw-r--r--src/backend/storage/lmgr/proc.c101
3 files changed, 911 insertions, 485 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 4b83c9c0d1..b35008a3bb 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.33 1998/08/25 21:20:26 scrappy Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -18,11 +18,9 @@
*
* Interface:
*
- * LockAcquire(), LockRelease(), LockMethodTableInit().
- *
- * LockReplace() is called only within this module and by the
- * lkchain module. It releases a lock without looking
- * the lock up in the lock table.
+ * LockAcquire(), LockRelease(), LockMethodTableInit(),
+ * LockMethodTableRename(), LockReleaseAll, LockOwners()
+ * LockResolveConflicts(), GrantLock()
*
* NOTE: This module is used to define new lock tables. The
* multi-level lock table (multi.c) used by the heap
@@ -35,6 +33,7 @@
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
+#include <signal.h>
#include "postgres.h"
#include "miscadmin.h"
@@ -49,25 +48,110 @@
#include "utils/palloc.h"
#include "access/xact.h"
#include "access/transam.h"
+#include "utils/trace.h"
+#include "utils/ps_status.h"
-static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
-
-/*#define LOCK_MGR_DEBUG*/
-
-#ifndef LOCK_MGR_DEBUG
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+ TransactionId xid);
-#define LOCK_PRINT(where,tag,type)
-#define LOCK_DUMP(where,lock,type)
-#define LOCK_DUMP_AUX(where,lock,type)
+/*
+ * lockDebugRelation can be used to trace unconditionally a single relation,
+ * for example pg_listener, if you suspect there are locking problems.
+ *
+ * lockDebugOidMin is is used to avoid tracing postgres relations, which
+ * would produce a lot of output. Unfortunately most system relations are
+ * created after bootstrap and have oid greater than BootstrapObjectIdData.
+ * If you are using tprintf you should specify a value greater than the max
+ * oid of system relations, which can be found with the following query:
+ *
+ * select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
+ *
+ * To get a useful lock trace you can use the following pg_options:
+ *
+ * -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
+ */
+#define LOCKDEBUG(lockmethod) (pg_options[TRACE_SHORTLOCKS+lockmethod])
+#define lockDebugRelation (pg_options[TRACE_LOCKRELATION])
+#define lockDebugOidMin (pg_options[TRACE_LOCKOIDMIN])
+#define lockReadPriority (pg_options[OPT_LOCKREADPRIORITY])
+
+#ifdef LOCK_MGR_DEBUG
+#define LOCK_PRINT(where,lock,type) \
+ if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+ && (lock->tag.relId >= lockDebugOidMin)) \
+ || (lock->tag.relId == lockDebugRelation)) \
+ LOCK_PRINT_AUX(where,lock,type)
+
+#define LOCK_PRINT_AUX(where,lock,type) \
+ TPRINTF(TRACE_ALL, \
+ "%s: lock(%x) tbl(%d) rel(%d) db(%d) tid(%d,%d) mask(%x) " \
+ "hold(%d,%d,%d,%d,%d)=%d " \
+ "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+ where, \
+ MAKE_OFFSET(lock), \
+ lock->tag.lockmethod, \
+ lock->tag.relId, \
+ lock->tag.dbId, \
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+ \
+ lock->tag.tupleId.ip_blkid.bi_lo), \
+ lock->tag.tupleId.ip_posid, \
+ lock->mask, \
+ lock->holders[1], \
+ lock->holders[2], \
+ lock->holders[3], \
+ lock->holders[4], \
+ lock->holders[5], \
+ lock->nHolding, \
+ lock->activeHolders[1], \
+ lock->activeHolders[2], \
+ lock->activeHolders[3], \
+ lock->activeHolders[4], \
+ lock->activeHolders[5], \
+ lock->nActive, \
+ lock->waitProcs.size, \
+ lock_types[type])
+
+#define XID_PRINT(where,xidentP) \
+ if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
+ && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+ >= lockDebugOidMin)) \
+ || (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+ == lockDebugRelation)) \
+ XID_PRINT_AUX(where,xidentP)
+
+#define XID_PRINT_AUX(where,xidentP) \
+ TPRINTF(TRACE_ALL, \
+ "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
+ "hold(%d,%d,%d,%d,%d)=%d", \
+ where, \
+ MAKE_OFFSET(xidentP), \
+ xidentP->tag.lock, \
+ XIDENT_LOCKMETHOD(*(xidentP)), \
+ xidentP->tag.pid, \
+ xidentP->tag.xid, \
+ xidentP->holders[1], \
+ xidentP->holders[2], \
+ xidentP->holders[3], \
+ xidentP->holders[4], \
+ xidentP->holders[5], \
+ xidentP->nHolding)
+
+#define LOCK_TPRINTF(lock, args...) \
+ if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+ && (lock->tag.relId >= lockDebugOidMin)) \
+ || (lock->tag.relId == lockDebugRelation)) \
+ TPRINTF(TRACE_ALL, args)
+
+#else /* !LOCK_MGR_DEBUG */
+#define LOCK_PRINT(where,lock,type)
+#define LOCK_PRINT_AUX(where,lock,type)
#define XID_PRINT(where,xidentP)
+#define XID_PRINT_AUX(where,xidentP)
+#define LOCK_TPRINTF(lock, args...)
+#endif /* !LOCK_MGR_DEBUG */
-#else /* LOCK_MGR_DEBUG */
-
-int lockDebug = 0;
-unsigned int lock_debug_oid_min = BootstrapObjectIdData;
static char *lock_types[] = {
- "NONE",
+ "",
"WRITE",
"READ",
"WRITE INTENT",
@@ -75,59 +159,6 @@ static char *lock_types[] = {
"EXTEND"
};
-#define LOCK_PRINT(where,tag,type)\
- if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
- elog(DEBUG, \
- "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
- MyProcPid,\
- tag->relId, tag->dbId, \
- ((tag->tupleId.ip_blkid.bi_hi<<16)+\
- tag->tupleId.ip_blkid.bi_lo),\
- tag->tupleId.ip_posid, \
- lock_types[type])
-
-#define LOCK_DUMP(where,lock,type)\
- if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
- LOCK_DUMP_AUX(where,lock,type)
-
-#define LOCK_DUMP_AUX(where,lock,type)\
- elog(DEBUG, \
- "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
- "holders (%d,%d,%d,%d,%d) type (%s)",where, \
- MyProcPid,\
- lock->tag.relId, lock->tag.dbId, \
- ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
- lock->tag.tupleId.ip_blkid.bi_lo),\
- lock->tag.tupleId.ip_posid, \
- lock->nHolding,\
- lock->holders[1],\
- lock->holders[2],\
- lock->holders[3],\
- lock->holders[4],\
- lock->holders[5],\
- lock_types[type])
-
-#define XID_PRINT(where,xidentP)\
- if ((lockDebug >= 2) && \
- (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
- >= lock_debug_oid_min)) \
- elog(DEBUG,\
- "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
- "holders (%d,%d,%d,%d,%d)",\
- where,\
- MyProcPid,\
- xidentP->tag.xid,\
- xidentP->tag.pid,\
- xidentP->tag.lock,\
- xidentP->nHolding,\
- xidentP->holders[1],\
- xidentP->holders[2],\
- xidentP->holders[3],\
- xidentP->holders[4],\
- xidentP->holders[5])
-
-#endif /* LOCK_MGR_DEBUG */
-
SPINLOCK LockMgrLock; /* in Shmem or created in
* CreateSpinlocks() */
@@ -171,6 +202,16 @@ InitLocks()
BITS_ON[i] = bit;
BITS_OFF[i] = ~bit;
}
+
+#ifdef LOCK_MGR_DEBUG
+ /*
+ * If lockDebugOidMin value has not been specified
+ * in pg_options set a default value.
+ */
+ if (!lockDebugOidMin) {
+ lockDebugOidMin = BootstrapObjectIdData;
+ }
+#endif
}
/* -------------------
@@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName,
{
elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
numModes, MAX_LOCKMODES);
- return (INVALID_TABLEID);
+ return (INVALID_LOCKMETHOD);
}
/* allocate a string for the shmem index table lookup */
@@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName,
if (!shmemName)
{
elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
- return (INVALID_TABLEID);
+ return (INVALID_LOCKMETHOD);
}
/* each lock table has a non-shared header */
@@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName,
{
elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
pfree(shmemName);
- return (INVALID_TABLEID);
+ return (INVALID_LOCKMETHOD);
}
/* ------------------------
@@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName,
if (status)
return (lockMethodTable->ctl->lockmethod);
else
- return (INVALID_TABLEID);
+ return (INVALID_LOCKMETHOD);
}
/*
@@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName,
* client to use different lockmethods when acquiring/releasing
* short term and long term locks.
*/
-#ifdef NOT_USED
+
LOCKMETHOD
LockMethodTableRename(LOCKMETHOD lockmethod)
{
LOCKMETHOD newLockMethod;
if (NumLockMethods >= MAX_LOCK_METHODS)
- return (INVALID_TABLEID);
- if (LockMethodTable[lockmethod] == INVALID_TABLEID)
- return (INVALID_TABLEID);
+ return (INVALID_LOCKMETHOD);
+ if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+ return (INVALID_LOCKMETHOD);
/* other modules refer to the lock table by a lockmethod */
newLockMethod = NumLockMethods;
@@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
return (newLockMethod);
}
-#endif
/*
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
@@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* Side Effects: The lock is always acquired. No way to abort
* a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain.
+ *
#ifdef USER_LOCKS
+ *
* Note on User Locks:
+ *
* User locks are handled totally on the application side as
* long term cooperative locks which extend beyond the normal
* transaction boundaries. Their purpose is to indicate to an
@@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* acquired if already held by another process. They must be
* released explicitly by the application but they are released
* automatically when a backend terminates.
- * They are indicated by a dummy lockmethod 0 which doesn't have
- * any table allocated but uses the normal lock table, and are
- * distinguished from normal locks for the following differences:
+ * They are indicated by a lockmethod 2 which is an alias for the
+ * normal lock table, and are distinguished from normal locks
+ * for the following differences:
*
* normal lock user lock
*
- * lockmethod 1 0
+ * lockmethod 1 2
* tag.relId rel oid 0
* tag.ItemPointerData.ip_blkid block id lock id2
* tag.ItemPointerData.ip_posid tuple offset lock id1
* xid.pid 0 backend pid
- * xid.xid current xid 0
+ * xid.xid xid or 0 0
* persistence transaction user or backend
*
* The lockmode parameter can have the same values for normal locks
* although probably only WRITE_LOCK can have some practical use.
*
- * DZ - 4 Oct 1996
+ * DZ - 22 Nov 1997
#endif
*/
@@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int status;
- TransactionId myXid;
+ TransactionId xid;
#ifdef USER_LOCKS
int is_user_lock;
- is_user_lock = (lockmethod == 0);
+ is_user_lock = (lockmethod == USER_LOCKMETHOD);
if (is_user_lock)
{
- lockmethod = 1;
#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
- locktag->tupleId.ip_posid,
- ((locktag->tupleId.ip_blkid.bi_hi << 16) +
- locktag->tupleId.ip_blkid.bi_lo),
- lockmode);
+ TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u,%u] %s",
+ locktag->tupleId.ip_posid,
+ ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+ locktag->tupleId.ip_blkid.bi_lo),
+ lock_types[lockmode]);
#endif
}
#endif
+ /* ???????? This must be changed when short term locks will be used */
+ locktag->lockmethod = lockmethod;
+
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
@@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (LockingIsDisabled)
return (TRUE);
- LOCK_PRINT("Acquire", locktag, lockmode);
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock);
+ /*
+ * Find or create a lock with this tag
+ */
Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found);
-
+ lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+ HASH_ENTER, &found);
if (!lock)
{
SpinRelease(masterLock);
@@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (!found)
{
lock->mask = 0;
- ProcQueueInit(&(lock->waitProcs));
- MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
- MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
lock->nHolding = 0;
lock->nActive = 0;
-
+ MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
+ MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
+ ProcQueueInit(&(lock->waitProcs));
Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
&(locktag->tupleId.ip_blkid)));
-
+ LOCK_PRINT("LockAcquire: new", lock, lockmode);
+ } else {
+ LOCK_PRINT("LockAcquire: found", lock, lockmode);
+ Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+ Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+ Assert(lock->nActive <= lock->nHolding);
}
/* ------------------
@@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* ------------------
*/
xidTable = lockMethodTable->xidHash;
- myXid = GetCurrentTransactionId();
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
@@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* ------------------
*/
MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
- TransactionIdStore(myXid, &item.tag.xid);
item.tag.lock = MAKE_OFFSET(lock);
-#if 0
- item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+ item.tag.lockmethod = lockmethod;
#endif
-
#ifdef USER_LOCKS
if (is_user_lock)
{
item.tag.pid = MyProcPid;
- item.tag.xid = myXid = 0;
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
- item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+ item.tag.xid = xid = 0;
+ } else {
+ xid = GetCurrentTransactionId();
+ TransactionIdStore(xid, &item.tag.xid);
}
+#else
+ xid = GetCurrentTransactionId();
+ TransactionIdStore(xid, &item.tag.xid);
#endif
- result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
+ /*
+ * Find or create an xid entry with this tag
+ */
+ result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+ HASH_ENTER, &found);
if (!result)
{
elog(NOTICE, "LockAcquire: xid table corrupted");
return (STATUS_ERROR);
}
+
+ /*
+ * If not found initialize the new entry
+ */
if (!found)
{
- XID_PRINT("LockAcquire: queueing XidEnt", result);
- ProcAddLock(&result->queue);
result->nHolding = 0;
MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
+ ProcAddLock(&result->queue);
+ XID_PRINT("LockAcquire: new", result);
+ } else {
+ XID_PRINT("LockAcquire: found", result);
+ Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
+ Assert(result->nHolding <= lock->nActive);
}
/* ----------------
@@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
lock->nHolding++;
lock->holders[lockmode]++;
+ Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
/* --------------------
* If I'm the only one holding a lock, then there
@@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
{
result->holders[lockmode]++;
result->nHolding++;
+ XID_PRINT("LockAcquire: owning", result);
+ Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
GrantLock(lock, lockmode);
SpinRelease(masterLock);
return (TRUE);
}
- Assert(result->nHolding <= lock->nActive);
-
- status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
-
+ status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
if (status == STATUS_OK)
GrantLock(lock, lockmode);
else if (status == STATUS_FOUND)
{
#ifdef USER_LOCKS
-
/*
- * User locks are non blocking. If we can't acquire a lock remove
- * the xid entry and return FALSE without waiting.
+ * User locks are non blocking. If we can't acquire a lock we must
+ * remove the xid entry and return FALSE without waiting.
*/
if (is_user_lock)
{
if (!result->nHolding)
{
SHMQueueDelete(&result->queue);
- hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
+ result = (XIDLookupEnt *) hash_search(xidTable,
+ (Pointer) &result,
+ HASH_REMOVE, &found);
+ if (!result || !found) {
+ elog(NOTICE, "LockAcquire: remove xid, table corrupted");
+ }
+ } else {
+ XID_PRINT_AUX("LockAcquire: NHOLDING", result);
}
lock->nHolding--;
lock->holders[lockmode]--;
+ LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
+ Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+ Assert(lock->nActive <= lock->nHolding);
SpinRelease(masterLock);
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockAcquire: user lock failed");
-#endif
return (FALSE);
}
#endif
- status = WaitOnLock(lockmethod, lock, lockmode);
- XID_PRINT("Someone granted me the lock", result);
+ status = WaitOnLock(lockmethod, lock, lockmode, xid);
+ /*
+ * Check the xid entry status, in case something in the
+ * ipc communication doesn't work correctly.
+ */
+ if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) {
+ XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
+ LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
+ /* Should we retry ? */
+ return (FALSE);
+ }
+ XID_PRINT("LockAcquire: granted", result);
+ LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
SpinRelease(masterLock);
@@ -646,7 +725,8 @@ int
LockResolveConflicts(LOCKMETHOD lockmethod,
LOCK *lock,
LOCKMODE lockmode,
- TransactionId xid)
+ TransactionId xid,
+ XIDLookupEnt *xidentP) /* xident ptr or NULL */
{
XIDLookupEnt *result,
item;
@@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
int bitmask;
int i,
tmpMask;
+#ifdef USER_LOCKS
+ int is_user_lock;
+#endif
numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
xidTable = LockMethodTable[lockmethod]->xidHash;
- /* ---------------------
- * read my own statistics from the xid table. If there
- * isn't an entry, then we'll just add one.
- *
- * Zero out the tag, this clears the padding bytes for long
- * word alignment and ensures hashing consistency.
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE);
- TransactionIdStore(xid, &item.tag.xid);
- item.tag.lock = MAKE_OFFSET(lock);
-#if 0
- item.tag.pid = pid;
+ if (xidentP) {
+ /*
+ * A pointer to the xid entry was supplied from the caller.
+ * Actually only LockAcquire can do it.
+ */
+ result = xidentP;
+ } else {
+ /* ---------------------
+ * read my own statistics from the xid table. If there
+ * isn't an entry, then we'll just add one.
+ *
+ * Zero out the tag, this clears the padding bytes for long
+ * word alignment and ensures hashing consistency.
+ * ------------------
+ */
+ MemSet(&item, 0, XID_TAGSIZE);
+ item.tag.lock = MAKE_OFFSET(lock);
+#ifdef USE_XIDTAG_LOCKMETHOD
+ item.tag.lockmethod = lockmethod;
+#endif
+#ifdef USER_LOCKS
+ is_user_lock = (lockmethod == 2);
+ if (is_user_lock) {
+ item.tag.pid = MyProcPid;
+ item.tag.xid = 0;
+ } else {
+ TransactionIdStore(xid, &item.tag.xid);
+ }
+#else
+ TransactionIdStore(xid, &item.tag.xid);
#endif
- if (!(result = (XIDLookupEnt *)
- hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
- {
- elog(NOTICE, "LockResolveConflicts: xid table corrupted");
- return (STATUS_ERROR);
- }
- myHolders = result->holders;
+ /*
+ * Find or create an xid entry with this tag
+ */
+ result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+ HASH_ENTER, &found);
+ if (!result)
+ {
+ elog(NOTICE, "LockResolveConflicts: xid table corrupted");
+ return (STATUS_ERROR);
+ }
- if (!found)
- {
- /* ---------------
- * we're not holding any type of lock yet. Clear
- * the lock stats.
- * ---------------
+ /*
+ * If not found initialize the new entry. THIS SHOULD NEVER HAPPEN,
+ * if we are trying to resolve a conflict we must already have
+ * allocated an xid entry for this lock. dz 21-11-1997
*/
- MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
- result->nHolding = 0;
+ if (!found)
+ {
+ /* ---------------
+ * we're not holding any type of lock yet. Clear
+ * the lock stats.
+ * ---------------
+ */
+ MemSet(result->holders,0, numLockModes * sizeof(*(lock->holders)));
+ result->nHolding = 0;
+ XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
+ } else {
+ XID_PRINT("LockResolveConflicts: found", result);
+ }
}
+ Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
+ /*
+ * We can control runtime this option. Default is lockReadPriority=0
+ */
+ if (!lockReadPriority)
{
/* ------------------------
* If someone with a greater priority is waiting for the lock,
@@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
PROC_QUEUE *waitQueue = &(lock->waitProcs);
PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
- if (waitQueue->size && topproc->prio > myprio)
+ if (waitQueue->size && topproc->prio > myprio) {
+ XID_PRINT("LockResolveConflicts: higher priority proc waiting",
+ result);
return STATUS_FOUND;
+ }
}
/* ----------------------------
@@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
*/
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
{
-
result->holders[lockmode]++;
result->nHolding++;
-
- XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+ XID_PRINT("LockResolveConflicts: no conflict", result);
+ Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
return (STATUS_OK);
}
@@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* that does not reflect our own locks.
* ------------------------
*/
+ myHolders = result->holders;
bitmask = 0;
tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
@@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
*/
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
{
-
/* no conflict. Get the lock and go on */
-
result->holders[lockmode]++;
result->nHolding++;
-
- XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+ XID_PRINT("LockResolveConflicts: resolved", result);
+ Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
return (STATUS_OK);
-
}
+ XID_PRINT("LockResolveConflicts: conflicting", result);
return (STATUS_FOUND);
}
+/*
+ * GrantLock -- update the lock data structure to show
+ * the new lock holder.
+ */
+void
+GrantLock(LOCK *lock, LOCKMODE lockmode)
+{
+ lock->nActive++;
+ lock->activeHolders[lockmode]++;
+ lock->mask |= BITS_ON[lockmode];
+ LOCK_PRINT("GrantLock", lock, lockmode);
+ Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
+ Assert(lock->nActive <= lock->nHolding);
+}
+
static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+ TransactionId xid)
{
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
int prio = lockMethodTable->ctl->prio[lockmode];
+ char old_status[64],
+ new_status[64];
Assert(lockmethod < NumLockMethods);
@@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
* synchronization for this queue. That will not be true if/when
* people can be deleted from the queue by a SIGINT or something.
*/
- LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+ LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+ strcpy(old_status, PS_STATUS);
+ strcpy(new_status, PS_STATUS);
+ strcat(new_status, " waiting");
+ PS_SET_STATUS(new_status);
if (ProcSleep(waitQueue,
lockMethodTable->ctl->masterLock,
lockmode,
prio,
- lock) != NO_ERROR)
+ lock,
+ xid) != NO_ERROR)
{
/* -------------------
- * This could have happend as a result of a deadlock, see HandleDeadLock()
- * Decrement the lock nHolding and holders fields as we are no longer
- * waiting on this lock.
+ * This could have happend as a result of a deadlock,
+ * see HandleDeadLock().
+ * Decrement the lock nHolding and holders fields as
+ * we are no longer waiting on this lock.
* -------------------
*/
lock->nHolding--;
lock->holders[lockmode]--;
- LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+ LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+ Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+ Assert(lock->nActive <= lock->nHolding);
SpinRelease(lockMethodTable->ctl->masterLock);
elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
+
+ /* not reached */
}
- LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
+ PS_SET_STATUS(old_status);
+ LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
return (STATUS_OK);
}
@@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
XIDLookupEnt *result,
item;
HTAB *xidTable;
+ TransactionId xid;
bool wakeupNeeded = true;
+ int trace_flag;
#ifdef USER_LOCKS
int is_user_lock;
- is_user_lock = (lockmethod == 0);
+ is_user_lock = (lockmethod == USER_LOCKMETHOD);
if (is_user_lock)
{
- lockmethod = 1;
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
- locktag->tupleId.ip_posid,
- ((locktag->tupleId.ip_blkid.bi_hi << 16) +
- locktag->tupleId.ip_blkid.bi_lo),
- lockmode);
-#endif
+ TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u,%u] %d",
+ locktag->tupleId.ip_posid,
+ ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+ locktag->tupleId.ip_blkid.bi_lo),
+ lockmode);
}
#endif
+ /* ???????? This must be changed when short term locks will be used */
+ locktag->lockmethod = lockmethod;
+
+#ifdef USER_LOCKS
+ trace_flag = \
+ (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+ trace_flag = TRACE_LOCKS;
+#endif
+
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
@@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (LockingIsDisabled)
return (TRUE);
- LOCK_PRINT("Release", locktag, lockmode);
-
masterLock = lockMethodTable->ctl->masterLock;
- xidTable = lockMethodTable->xidHash;
-
SpinAcquire(masterLock);
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *)
- hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
-
-#ifdef USER_LOCKS
-
/*
- * If the entry is not found hash_search returns TRUE instead of NULL,
- * so we must check it explicitly.
+ * Find a lock with this tag
*/
- if ((is_user_lock) && (lock == (LOCK *) TRUE))
- {
- SpinRelease(masterLock);
- elog(NOTICE, "LockRelease: there are no locks with this tag");
- return (FALSE);
- }
-#endif
+ Assert(lockMethodTable->lockHash->hash == tag_hash);
+ lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+ HASH_FIND, &found);
/*
- * let the caller print its own error message, too. Do not
- * elog(ERROR).
+ * let the caller print its own error message, too. Do not elog(ERROR).
*/
if (!lock)
{
@@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (!found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ if (is_user_lock) {
+ TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+ return (FALSE);
+ }
+#endif
elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
return (FALSE);
}
+ LOCK_PRINT("LockRelease: found", lock, lockmode);
+ Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+ Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+ Assert(lock->nActive <= lock->nHolding);
- Assert(lock->nHolding > 0);
-
-#ifdef USER_LOCKS
-
- /*
- * If this is an user lock it can be removed only after checking that
- * it was acquired by the current process, so this code is skipped and
- * executed later.
- */
- if (!is_user_lock)
- {
-#endif
-
- /*
- * fix the general lock stats
- */
- lock->nHolding--;
- lock->holders[lockmode]--;
- lock->nActive--;
- lock->activeHolders[lockmode]--;
-
- Assert(lock->nActive >= 0);
-
- if (!lock->nHolding)
- {
- /* ------------------
- * if there's no one waiting in the queue,
- * we just released the last lock.
- * Delete it from the lock table.
- * ------------------
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
- (Pointer) &(lock->tag),
- HASH_REMOVE_SAVED,
- &found);
- Assert(lock && found);
- wakeupNeeded = false;
- }
-#ifdef USER_LOCKS
- }
-#endif
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
@@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* ------------------
*/
MemSet(&item, 0, XID_TAGSIZE);
-
- TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
item.tag.lock = MAKE_OFFSET(lock);
-#if 0
- item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+ item.tag.lockmethod = lockmethod;
#endif
-
#ifdef USER_LOCKS
- if (is_user_lock)
- {
+ if (is_user_lock) {
item.tag.pid = MyProcPid;
- item.tag.xid = 0;
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
- item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+ item.tag.xid = xid = 0;
+ } else {
+ xid = GetCurrentTransactionId();
+ TransactionIdStore(xid, &item.tag.xid);
}
+#else
+ xid = GetCurrentTransactionId();
+ TransactionIdStore(xid, &item.tag.xid);
#endif
- if (!(result = (XIDLookupEnt *) hash_search(xidTable,
- (Pointer) &item,
- HASH_FIND_SAVE,
- &found))
- || !found)
+ /*
+ * Find an xid entry with this tag
+ */
+ xidTable = lockMethodTable->xidHash;
+ result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+ HASH_FIND_SAVE, &found);
+ if (!result || !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
- if ((is_user_lock) && (result))
- elog(NOTICE, "LockRelease: you don't have a lock on this tag");
- else
- elog(NOTICE, "LockRelease: find xid, table corrupted");
-#else
- elog(NOTICE, "LockReplace: xid table corrupted");
+ if (!found && is_user_lock) {
+ TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+ } else
#endif
+ elog(NOTICE, "LockReplace: xid table corrupted");
return (FALSE);
}
+ XID_PRINT("LockRelease: found", result);
+ Assert(result->tag.lock == MAKE_OFFSET(lock));
+
+ /*
+ * Check that we are actually holding a lock of the type we want
+ * to release.
+ */
+ if (!(result->holders[lockmode] > 0)) {
+ SpinRelease(masterLock);
+ XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
+ elog(NOTICE, "LockRelease: you don't own a lock of type %s",
+ lock_types[lockmode]);
+ Assert(result->holders[lockmode] >= 0);
+ return (FALSE);
+ }
+ Assert(result->nHolding > 0);
+
+ /*
+ * fix the general lock stats
+ */
+ lock->nHolding--;
+ lock->holders[lockmode]--;
+ lock->nActive--;
+ lock->activeHolders[lockmode]--;
+
+ /* --------------------------
+ * If there are still active locks of the type I just released, no one
+ * should be woken up. Whoever is asleep will still conflict
+ * with the remaining locks.
+ * --------------------------
+ */
+ if (lock->activeHolders[lockmode])
+ {
+ wakeupNeeded = false;
+ }
+ else
+ {
+ /* change the conflict mask. No more of this lock type. */
+ lock->mask &= BITS_OFF[lockmode];
+ }
+
+ LOCK_PRINT("LockRelease: updated", lock, lockmode);
+ Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+ Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
+ Assert(lock->nActive <= lock->nHolding);
+
+ if (!lock->nHolding)
+ {
+ /* ------------------
+ * if there's no one waiting in the queue,
+ * we just released the last lock.
+ * Delete it from the lock table.
+ * ------------------
+ */
+ Assert(lockMethodTable->lockHash->hash == tag_hash);
+ lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+ (Pointer) &(lock->tag),
+ HASH_REMOVE,
+ &found);
+ Assert(lock && found);
+ wakeupNeeded = false;
+ }
/*
* now check to see if I have any private locks. If I do, decrement
@@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
result->holders[lockmode]--;
result->nHolding--;
-
- XID_PRINT("LockRelease updated xid stats", result);
+ XID_PRINT("LockRelease: updated", result);
+ Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
/*
* If this was my last hold on this lock, delete my entry in the XID
@@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
if (!result->nHolding)
{
-#ifdef USER_LOCKS
- if (result->queue.prev == INVALID_OFFSET)
+ if (result->queue.prev == INVALID_OFFSET) {
elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
- if (result->queue.next == INVALID_OFFSET)
+ }
+ if (result->queue.next == INVALID_OFFSET) {
elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
-#endif
+ }
if (result->queue.next != INVALID_OFFSET)
SHMQueueDelete(&result->queue);
- if (!(result = (XIDLookupEnt *)
- hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
- !found)
+ XID_PRINT("LockRelease: deleting", result);
+ result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
+ HASH_REMOVE_SAVED, &found);
+ if (!result || !found)
{
SpinRelease(masterLock);
-#ifdef USER_LOCKS
elog(NOTICE, "LockRelease: remove xid, table corrupted");
-#else
- elog(NOTICE, "LockReplace: xid table corrupted");
-#endif
return (FALSE);
}
}
-#ifdef USER_LOCKS
-
- /*
- * If this is an user lock remove it now, after the corresponding xid
- * entry has been found and deleted.
- */
- if (is_user_lock)
- {
-
- /*
- * fix the general lock stats
- */
- lock->nHolding--;
- lock->holders[lockmode]--;
- lock->nActive--;
- lock->activeHolders[lockmode]--;
-
- Assert(lock->nActive >= 0);
-
- if (!lock->nHolding)
- {
- /* ------------------
- * if there's no one waiting in the queue,
- * we just released the last lock.
- * Delete it from the lock table.
- * ------------------
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
- (Pointer) &(lock->tag),
- HASH_REMOVE,
- &found);
- Assert(lock && found);
- wakeupNeeded = false;
- }
- }
-#endif
-
- /* --------------------------
- * If there are still active locks of the type I just released, no one
- * should be woken up. Whoever is asleep will still conflict
- * with the remaining locks.
- * --------------------------
- */
- if (!(lock->activeHolders[lockmode]))
- {
- /* change the conflict mask. No more of this lock type. */
- lock->mask &= BITS_OFF[lockmode];
- }
-
if (wakeupNeeded)
{
/* --------------------------
@@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
}
+ else
+ {
+ LOCK_TPRINTF(lock, "LockRelease: no wakeup needed");
+ }
SpinRelease(masterLock);
return (TRUE);
}
/*
- * GrantLock -- update the lock data structure to show
- * the new lock holder.
- */
-void
-GrantLock(LOCK *lock, LOCKMODE lockmode)
-{
- lock->nActive++;
- lock->activeHolders[lockmode]++;
- lock->mask |= BITS_ON[lockmode];
-}
-
-#ifdef USER_LOCKS
-/*
* LockReleaseAll -- Release all locks in a process lock queue.
- *
- * Note: This code is a little complicated by the presence in the
- * same queue of user locks which can't be removed from the
- * normal lock queue at the end of a transaction. They must
- * however be removed when the backend exits.
- * A dummy lockmethod 0 is used to indicate that we are releasing
- * the user locks, from the code added to ProcKill().
*/
-#endif
bool
LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
{
@@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
int done;
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
+ XIDLookupEnt *result;
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
@@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
numLockModes;
LOCK *lock;
bool found;
+ int trace_flag;
+ int xidtag_lockmethod;
#ifdef USER_LOCKS
int is_user_lock_table,
count,
- nskip;
+ nleft;
- is_user_lock_table = (lockmethod == 0);
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
-#endif
- if (is_user_lock_table)
- lockmethod = 1;
+ count = nleft = 0;
+
+ is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
+ trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+ trace_flag = TRACE_LOCKS;
#endif
+ TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
+ lockmethod, MyProcPid);
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
- if (!lockMethodTable)
+ if (!lockMethodTable) {
+ elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
return (FALSE);
-
- numLockModes = lockMethodTable->ctl->numLockModes;
- masterLock = lockMethodTable->ctl->masterLock;
+ }
if (SHMQueueEmpty(lockQueue))
return TRUE;
-#ifdef USER_LOCKS
+ numLockModes = lockMethodTable->ctl->numLockModes;
+ masterLock = lockMethodTable->ctl->masterLock;
+
SpinAcquire(masterLock);
-#endif
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
- XID_PRINT("LockReleaseAll", xidLook);
-
-#ifndef USER_LOCKS
- SpinAcquire(masterLock);
-#else
- count = nskip = 0;
-#endif
for (;;)
{
+ /*
+ * Sometimes the queue appears to be messed up.
+ */
+ if (count++ > 1000)
+ {
+ elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
+ nleft = 0;
+ break;
+ }
+
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
@@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
-
-#ifdef USER_LOCKS
+ xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
+ if ((xidtag_lockmethod == lockmethod) || (trace_flag >= 2)) {
+ XID_PRINT("LockReleaseAll", xidLook);
+ LOCK_PRINT("LockReleaseAll", lock, 0);
+ }
- /*
- * Sometimes the queue appears to be messed up.
- */
- if (count++ > 2000)
- {
- elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
- nskip = 0;
- break;
+#ifdef USE_XIDTAG_LOCKMETHOD
+ if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
+ elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
+ xidtag_lockmethod, lock->tag.lockmethod);
+#endif
+ if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2)) {
+ TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
+ nleft++;
+ goto next_item;
}
+
+ Assert(lock->nHolding > 0);
+ Assert(lock->nActive > 0);
+ Assert(lock->nActive <= lock->nHolding);
+ Assert(xidLook->nHolding >= 0);
+ Assert(xidLook->nHolding <= lock->nHolding);
+
+#ifdef USER_LOCKS
if (is_user_lock_table)
{
if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
{
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
- nskip++;
+ TPRINTF(TRACE_USERLOCKS,
+ "LockReleaseAll: skiping normal lock [%d,%d,%d]",
+ xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+ nleft++;
goto next_item;
}
if (xidLook->tag.pid != MyProcPid)
{
- /* This should never happen */
-#ifdef USER_LOCKS_DEBUG
+ /* Should never happen */
elog(NOTICE,
- "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+ "LockReleaseAll: INVALID PID: [%u,%u] [%d,%d,%d]",
lock->tag.tupleId.ip_posid,
((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
lock->tag.tupleId.ip_blkid.bi_lo),
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
- nskip++;
+ xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+ nleft++;
goto next_item;
}
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE,
- "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
- lock->tag.tupleId.ip_posid,
- ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
- lock->tag.tupleId.ip_blkid.bi_lo),
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
+ TPRINTF(TRACE_USERLOCKS,
+ "LockReleaseAll: releasing user lock [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
}
else
{
- if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
+ /* Can't check xidLook->tag.xid, can be 0 also for normal locks */
+ if (xidLook->tag.pid != 0)
{
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE,
- "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
- lock->tag.tupleId.ip_posid,
- ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
- lock->tag.tupleId.ip_blkid.bi_lo),
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
- nskip++;
+ TPRINTF(TRACE_LOCKS,
+ "LockReleaseAll: skiping user lock [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+ nleft++;
goto next_item;
}
-#ifdef USER_LOCKS_DEBUG
- elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
}
#endif
@@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
*/
if (lock->nHolding != xidLook->nHolding)
{
- lock->nHolding -= xidLook->nHolding;
- lock->nActive -= xidLook->nHolding;
- Assert(lock->nActive >= 0);
for (i = 1; i <= numLockModes; i++)
{
+ Assert(xidLook->holders[i] >= 0);
lock->holders[i] -= xidLook->holders[i];
lock->activeHolders[i] -= xidLook->holders[i];
+ Assert((lock->holders[i] >= 0) \
+ && (lock->activeHolders[i] >= 0));
if (!lock->activeHolders[i])
lock->mask &= BITS_OFF[i];
}
+ lock->nHolding -= xidLook->nHolding;
+ lock->nActive -= xidLook->nHolding;
+ Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
+ Assert(lock->nActive <= lock->nHolding);
}
else
{
@@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
* --------------
*/
lock->nHolding = 0;
+ /* Fix the lock status, just for next LOCK_PRINT message. */
+ for (i=1; i<=numLockModes; i++) {
+ Assert(lock->holders[i] == lock->activeHolders[i]);
+ lock->holders[i] = lock->activeHolders[i] = 0;
+ }
}
+ LOCK_PRINT("LockReleaseAll: updated", lock, 0);
+
+ /*
+ * Remove the xid from the process lock queue
+ */
+ SHMQueueDelete(&xidLook->queue);
+
/* ----------------
* always remove the xidLookup entry, we're done with it now
* ----------------
*/
-#ifdef USER_LOCKS
- SHMQueueDelete(&xidLook->queue);
-#endif
- if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
- || !found)
+
+ XID_PRINT("LockReleaseAll: deleting", xidLook);
+ result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
+ (Pointer) xidLook,
+ HASH_REMOVE,
+ &found);
+ if (!result || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockReleaseAll: xid table corrupted");
@@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
* the last lock.
* --------------------
*/
-
+ LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *)
- hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
+ lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+ (Pointer) &(lock->tag),
+ HASH_REMOVE, &found);
if ((!lock) || (!found))
{
SpinRelease(masterLock);
@@ -1324,15 +1448,18 @@ next_item:
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp;
}
- SpinRelease(masterLock);
-#ifdef USER_LOCKS
/*
* Reinitialize the queue only if nothing has been left in.
*/
- if (nskip == 0)
-#endif
+ if (nleft == 0) {
+ TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
SHMQueueInit(lockQueue);
+ }
+
+ SpinRelease(masterLock);
+ TPRINTF(trace_flag, "LockReleaseAll: done");
+
return TRUE;
}
@@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
checked_procs[0] = MyProc;
nprocs = 1;
- lockMethodTable = LockMethodTable[1];
+ lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
xidTable = lockMethodTable->xidHash;
MemSet(&item, 0, XID_TAGSIZE);
@@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
+ LOCK_PRINT("DeadLockCheck", lock, 0);
/*
* This is our only check to see if we found the lock we want.
@@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
return false;
}
+/*
+ * Return an array with the pids of all processes owning a lock.
+ * This works only for user locks because normal locks have no
+ * pid information in the corresponding XIDLookupEnt.
+ */
+ArrayType *
+LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
+{
+ XIDLookupEnt *xidLook = NULL;
+ SPINLOCK masterLock;
+ LOCK *lock;
+ SHMEM_OFFSET lock_offset;
+ int count = 0;
+ LOCKMETHODTABLE *lockMethodTable;
+ HTAB *xidTable;
+ bool found;
+ int ndims,
+ nitems,
+ hdrlen,
+ size;
+ int lbounds[1],
+ hbounds[1];
+ ArrayType *array;
+ int *data_ptr;
+
+ /* Assume that no one will modify the result */
+ static int empty_array[] = { 20, 1, 0, 0, 0 };
+
+#ifdef USER_LOCKS
+ int is_user_lock;
+
+ is_user_lock = (lockmethod == USER_LOCKMETHOD);
+ if (is_user_lock)
+ {
+ TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u,%u]",
+ locktag->tupleId.ip_posid,
+ ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+ locktag->tupleId.ip_blkid.bi_lo));
+ }
+#endif
+
+ /* This must be changed when short term locks will be used */
+ locktag->lockmethod = lockmethod;
+
+ Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
+ lockMethodTable = LockMethodTable[lockmethod];
+ if (!lockMethodTable)
+ {
+ elog(NOTICE, "lockMethodTable is null in LockOwners");
+ return ((ArrayType *) &empty_array);
+ }
+
+ if (LockingIsDisabled)
+ {
+ return ((ArrayType *) &empty_array);
+ }
+
+ masterLock = lockMethodTable->ctl->masterLock;
+ SpinAcquire(masterLock);
+
+ /*
+ * Find a lock with this tag
+ */
+ Assert(lockMethodTable->lockHash->hash == tag_hash);
+ lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+ HASH_FIND, &found);
+
+ /*
+ * let the caller print its own error message, too. Do not elog(WARN).
+ */
+ if (!lock)
+ {
+ SpinRelease(masterLock);
+ elog(NOTICE, "LockOwners: locktable corrupted");
+ return ((ArrayType *) &empty_array);
+ }
+
+ if (!found)
+ {
+ SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ if (is_user_lock) {
+ TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
+ return ((ArrayType *) &empty_array);
+ }
+#endif
+ elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
+ return ((ArrayType *) &empty_array);
+ }
+ LOCK_PRINT("LockOwners: found", lock, 0);
+ Assert((lock->nHolding > 0) && (lock->nActive > 0));
+ Assert(lock->nActive <= lock->nHolding);
+ lock_offset = MAKE_OFFSET(lock);
+
+ /* Construct a 1-dimensional array */
+ ndims = 1;
+ hdrlen = ARR_OVERHEAD(ndims);
+ lbounds[0] = 0;
+ hbounds[0] = lock->nActive;
+ size = hdrlen + sizeof(int) * hbounds[0];
+ array = (ArrayType *) palloc(size);
+ MemSet(array, 0, size);
+ memmove((char *) array, (char *) &size, sizeof(int));
+ memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
+ memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+ memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
+ SET_LO_FLAG(false, array);
+ data_ptr = (int *) ARR_DATA_PTR(array);
+
+ xidTable = lockMethodTable->xidHash;
+ hash_seq(NULL);
+ nitems = 0;
+ while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+ (xidLook != (XIDLookupEnt *)TRUE)) {
+ if (count++ > 1000) {
+ elog(NOTICE,"LockOwners: possible loop, giving up");
+ break;
+ }
+
+ if (xidLook->tag.pid == 0) {
+ XID_PRINT("LockOwners: no pid", xidLook);
+ continue;
+ }
+
+ if (!xidLook->tag.lock) {
+ XID_PRINT("LockOwners: NULL LOCK", xidLook);
+ continue;
+ }
+
+ if (xidLook->tag.lock != lock_offset) {
+ XID_PRINT("LockOwners: different lock", xidLook);
+ continue;
+ }
+
+ if (LOCK_LOCKMETHOD(*lock) != lockmethod) {
+ XID_PRINT("LockOwners: other table", xidLook);
+ continue;
+ }
+
+ if (xidLook->nHolding <= 0) {
+ XID_PRINT("LockOwners: not holding", xidLook);
+ continue;
+ }
+
+ if (nitems >= hbounds[0]) {
+ elog(NOTICE,"LockOwners: array size exceeded");
+ break;
+ }
+
+ /*
+ * Check that the holding process is still alive by sending
+ * him an unused (ignored) signal. If the kill fails the
+ * process is not alive.
+ */
+ if ((xidLook->tag.pid != MyProcPid) \
+ && (kill(xidLook->tag.pid, SIGCHLD)) != 0)
+ {
+ /* Return a negative pid to signal that process is dead */
+ data_ptr[nitems++] = - (xidLook->tag.pid);
+ XID_PRINT("LockOwners: not alive", xidLook);
+ /* XXX - TODO: remove this entry and update lock stats */
+ continue;
+ }
+
+ /* Found a process holding the lock */
+ XID_PRINT("LockOwners: holding", xidLook);
+ data_ptr[nitems++] = xidLook->tag.pid;
+ }
+
+ SpinRelease(masterLock);
+
+ /* Adjust the actual size of the array */
+ hbounds[0] = nitems;
+ size = hdrlen + sizeof(int) * hbounds[0];
+ memmove((char *) array, (char *) &size, sizeof(int));
+ memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+
+ return (array);
+}
+
#ifdef DEADLOCK_DEBUG
/*
- * Dump all locks. Must have already acquired the masterLock.
+ * Dump all locks in the proc->lockQueue. Must have already acquired
+ * the masterLock.
*/
void
DumpLocks()
@@ -1577,9 +1885,8 @@ DumpLocks()
SPINLOCK masterLock;
int numLockModes;
LOCK *lock;
-
- count;
- int lockmethod = 1;
+ int count = 0;
+ int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
ShmemPIDLookup(MyProcPid, &location);
@@ -1604,11 +1911,18 @@ DumpLocks()
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
end = MAKE_OFFSET(lockQueue);
- LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
- XID_PRINT("DumpLocks", xidLook);
+ if (MyProc->waitLock) {
+ LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
+ }
- for (count = 0;;)
+ for (;;)
{
+ if (count++ > 2000)
+ {
+ elog(NOTICE, "DumpLocks: xid loop detected, giving up");
+ break;
+ }
+
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
@@ -1618,19 +1932,68 @@ DumpLocks()
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
- LOCK_DUMP("DumpLocks", lock, 0);
-
- if (count++ > 2000)
- {
- elog(NOTICE, "DumpLocks: xid loop detected, giving up");
- break;
- }
+ XID_PRINT_AUX("DumpLocks", xidLook);
+ LOCK_PRINT_AUX("DumpLocks", lock, 0);
if (done)
break;
+
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp;
}
}
+/*
+ * Dump all postgres locks. Must have already acquired the masterLock.
+ */
+void
+DumpAllLocks()
+{
+ SHMEM_OFFSET location;
+ PROC *proc;
+ XIDLookupEnt *xidLook = NULL;
+ LOCK *lock;
+ int pid;
+ int count = 0;
+ int lockmethod = DEFAULT_LOCKMETHOD;
+ LOCKMETHODTABLE *lockMethodTable;
+ HTAB *xidTable;
+
+ pid = getpid();
+ ShmemPIDLookup(pid,&location);
+ if (location == INVALID_OFFSET)
+ return;
+ proc = (PROC *) MAKE_PTR(location);
+ if (proc != MyProc)
+ return;
+
+ Assert(lockmethod < NumLockMethods);
+ lockMethodTable = LockMethodTable[lockmethod];
+ if (!lockMethodTable)
+ return;
+
+ xidTable = lockMethodTable->xidHash;
+
+ if (MyProc->waitLock) {
+ LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock,0);
+ }
+
+ hash_seq(NULL);
+ while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+ (xidLook != (XIDLookupEnt *)TRUE)) {
+ XID_PRINT_AUX("DumpAllLocks", xidLook);
+
+ if (xidLook->tag.lock) {
+ lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+ LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
+ } else {
+ elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
+ }
+
+ if (count++ > 2000) {
+ elog(NOTICE,"DumpAllLocks: possible loop, giving up");
+ break;
+ }
+ }
+}
#endif
diff --git a/src/backend/storage/lmgr/multi.c b/src/backend/storage/lmgr/multi.c
index 0998389ffe..3887a8e37d 100644
--- a/src/backend/storage/lmgr/multi.c
+++ b/src/backend/storage/lmgr/multi.c
@@ -12,7 +12,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.22 1998/08/19 02:02:44 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.23 1998/08/25 21:20:28 scrappy Exp $
*
* NOTES:
* (1) The lock.c module assumes that the caller here is doing
@@ -29,12 +29,10 @@
#include "utils/rel.h"
#include "miscadmin.h" /* MyDatabaseId */
-static bool
-MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
- PG_LOCK_LEVEL level);
-static bool
-MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
- PG_LOCK_LEVEL level);
+static bool MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag,
+ LOCKMODE lockmode, PG_LOCK_LEVEL level);
+static bool MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag,
+ LOCKMODE lockmode, PG_LOCK_LEVEL level);
#ifdef LowLevelLocking
@@ -130,6 +128,7 @@ static int MultiPrios[] = {
* lock table is ONE lock table, not three.
*/
LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL;
+LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL;
#ifdef NOT_USED
LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL;
#endif
@@ -150,12 +149,25 @@ InitMultiLevelLocks()
/* -----------------------
* No short term lock table for now. -Jeff 15 July 1991
*
- * ShortTermTableId = LockTableRename(lockmethod);
+ * ShortTermTableId = LockMethodTableRename(lockmethod);
* if (! (ShortTermTableId)) {
* elog(ERROR,"InitMultiLocks: couldnt rename lock table");
* }
* -----------------------
*/
+
+#ifdef USER_LOCKS
+ /*
+ * Allocate another tableId for long-term locks
+ */
+ LongTermTableId = LockMethodTableRename(MultiTableId);
+ if (!(LongTermTableId))
+ {
+ elog(ERROR,
+ "InitMultiLevelLocks: couldn't rename long-term lock table");
+ }
+#endif
+
return MultiTableId;
}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index fb052582e7..4cd9602a8b 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
*/
#include <sys/time.h>
#include <unistd.h>
@@ -75,10 +75,13 @@
#include "storage/shmem.h"
#include "storage/spin.h"
#include "storage/proc.h"
+#include "utils/trace.h"
static void HandleDeadLock(int sig);
static PROC *ProcWakeup(PROC *proc, int errType);
+#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
+
/* --------------------
* Spin lock for manipulating the shared process data structure:
* ProcGlobal.... Adding an extra spin lock seemed like the smallest
@@ -247,10 +250,7 @@ InitProcess(IPCKey key)
*/
SpinRelease(ProcStructLock);
- MyProc->pid = 0;
-#if 0
MyProc->pid = MyProcPid;
-#endif
MyProc->xid = InvalidTransactionId;
#ifdef LowLevelLocking
MyProc->xmin = InvalidTransactionId;
@@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid)
* ---------------
*/
ProcReleaseSpins(proc);
- LockReleaseAll(1, &proc->lockQueue);
+ LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
#ifdef USER_LOCKS
- LockReleaseAll(0, &proc->lockQueue);
+ /*
+ * Assume we have a second lock table.
+ */
+ LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
#endif
/* ----------------
@@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue)
* NOTES: The process queue is now a priority queue for locking.
*/
int
-ProcSleep(PROC_QUEUE *waitQueue,
+ProcSleep(PROC_QUEUE *waitQueue, /* lock->waitProcs */
SPINLOCK spinlock,
- int token,
+ int token, /* lockmode */
int prio,
- LOCK *lock)
+ LOCK *lock,
+ TransactionId xid) /* needed by user locks, see below */
{
int i;
PROC *proc;
@@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* If we are a reader, and they are writers, skip past them */
-
for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
proc = (PROC *) MAKE_PTR(proc->links.prev);
@@ -482,6 +485,15 @@ ProcSleep(PROC_QUEUE *waitQueue,
MyProc->token = token;
MyProc->waitLock = lock;
+#ifdef USER_LOCKS
+ /* -------------------
+ * Currently, we only need this for the ProcWakeup routines.
+ * This must be 0 for user lock, so we can't just use the value
+ * from GetCurrentTransactionId().
+ * -------------------
+ */
+ TransactionIdStore(xid, &MyProc->xid);
+#else
#ifndef LowLevelLocking
/* -------------------
* currently, we only need this for the ProcWakeup routines
@@ -489,6 +501,7 @@ ProcSleep(PROC_QUEUE *waitQueue,
*/
TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
#endif
+#endif
/* -------------------
* assume that these two operations are atomic (because
@@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
* --------------
*/
MemSet(&timeval, 0, sizeof(struct itimerval));
- timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
+ timeval.it_value.tv_sec = \
+ (DeadlockCheckTimer ? DeadlockCheckTimer : DEADLOCK_CHECK_TIMER);
do
{
@@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
* the semaphore implementation.
* --------------
*/
- IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+ IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
+ IpcExclusiveLock);
} while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock
* check */
@@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
* ---------------
*/
timeval.it_value.tv_sec = 0;
-
-
if (setitimer(ITIMER_REAL, &timeval, &dummy))
elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
@@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue,
*/
SpinAcquire(spinlock);
+#ifdef LOCK_MGR_DEBUG
+ /* Just to get meaningful debug messages from DumpLocks() */
+ MyProc->waitLock = (LOCK *)NULL;
+#endif
+
return (MyProc->errType);
}
@@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
{
PROC *proc;
int count;
+ int trace_flag;
+ int last_locktype = -1;
+ int queue_size = queue->size;
+
+ Assert(queue->size >= 0);
if (!queue->size)
return (STATUS_NOT_FOUND);
proc = (PROC *) MAKE_PTR(queue->links.prev);
count = 0;
- while ((LockResolveConflicts(lockmethod,
+ while ((queue_size--) && (proc))
+ {
+ /*
+ * This proc will conflict as the previous one did, don't even try.
+ */
+ if (proc->token == last_locktype)
+ {
+ continue;
+ }
+
+ /*
+ * This proc conflicts with locks held by others, ignored.
+ */
+ if (LockResolveConflicts(lockmethod,
lock,
proc->token,
- proc->xid) == STATUS_OK))
- {
+ proc->xid,
+ (XIDLookupEnt *) NULL) != STATUS_OK)
+ {
+ last_locktype = proc->token;
+ continue;
+ }
/*
* there was a waiting process, grant it the lock before waking it
@@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
* time that the awoken process begins executing again.
*/
GrantLock(lock, proc->token);
- queue->size--;
/*
* ProcWakeup removes proc from the lock waiting process queue and
* returns the next proc in chain.
*/
- proc = ProcWakeup(proc, NO_ERROR);
count++;
- if (!proc || queue->size == 0)
- break;
+ queue->size--;
+ proc = ProcWakeup(proc, NO_ERROR);
}
+ Assert(queue->size >= 0);
+
if (count)
return (STATUS_OK);
- else
+ else {
/* Something is still blocking us. May have deadlocked. */
+ trace_flag = (lock->tag.lockmethod == USER_LOCKMETHOD) ? \
+ TRACE_USERLOCKS : TRACE_LOCKS;
+ TPRINTF(trace_flag,
+ "ProcLockWakeup: lock(%x) can't wake up any process",
+ MAKE_OFFSET(lock));
+#ifdef DEADLOCK_DEBUG
+ if (pg_options[trace_flag] >= 2)
+ DumpAllLocks();
+#endif
return (STATUS_NOT_FOUND);
+ }
}
void
@@ -685,7 +735,7 @@ HandleDeadLock(int sig)
}
#ifdef DEADLOCK_DEBUG
- DumpLocks();
+ DumpAllLocks();
#endif
if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
@@ -711,7 +761,8 @@ HandleDeadLock(int sig)
* I was awoken by a signal, not by someone unlocking my semaphore.
* ------------------
*/
- IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+ IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
+ IpcExclusiveLock);
/* -------------
* Set MyProc->errType to STATUS_ERROR so that we abort after