summaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/deadlock.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/deadlock.c')
-rw-r--r--src/backend/storage/lmgr/deadlock.c188
1 files changed, 107 insertions, 81 deletions
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index 31db44e74b..160fc64fb2 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -12,7 +12,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.2 2001/01/25 03:45:50 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.3 2001/03/22 03:59:46 momjian Exp $
*
* Interface:
*
@@ -29,32 +29,36 @@
/* One edge in the waits-for graph */
-typedef struct {
- PROC *waiter; /* the waiting process */
- PROC *blocker; /* the process it is waiting for */
- int pred; /* workspace for TopoSort */
- int link; /* workspace for TopoSort */
+typedef struct
+{
+ PROC *waiter; /* the waiting process */
+ PROC *blocker; /* the process it is waiting for */
+ int pred; /* workspace for TopoSort */
+ int link; /* workspace for TopoSort */
} EDGE;
/* One potential reordering of a lock's wait queue */
-typedef struct {
- LOCK *lock; /* the lock whose wait queue is described */
- PROC **procs; /* array of PROC *'s in new wait order */
- int nProcs;
+typedef struct
+{
+ LOCK *lock; /* the lock whose wait queue is described */
+ PROC **procs; /* array of PROC *'s in new wait order */
+ int nProcs;
} WAIT_ORDER;
static bool DeadLockCheckRecurse(PROC *proc);
static bool TestConfiguration(PROC *startProc);
static bool FindLockCycle(PROC *checkProc,
- EDGE *softEdges, int *nSoftEdges);
+ EDGE *softEdges, int *nSoftEdges);
static bool FindLockCycleRecurse(PROC *checkProc,
- EDGE *softEdges, int *nSoftEdges);
+ EDGE *softEdges, int *nSoftEdges);
static bool ExpandConstraints(EDGE *constraints, int nConstraints);
static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
- PROC **ordering);
+ PROC **ordering);
+
#ifdef DEBUG_DEADLOCK
static void PrintLockQueue(LOCK *lock, const char *info);
+
#endif
@@ -64,30 +68,34 @@ static void PrintLockQueue(LOCK *lock, const char *info);
/* Workspace for FindLockCycle */
static PROC **visitedProcs; /* Array of visited procs */
-static int nVisitedProcs;
+static int nVisitedProcs;
+
/* Workspace for TopoSort */
static PROC **topoProcs; /* Array of not-yet-output procs */
static int *beforeConstraints; /* Counts of remaining before-constraints */
static int *afterConstraints; /* List head for after-constraints */
+
/* Output area for ExpandConstraints */
static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */
-static int nWaitOrders;
+static int nWaitOrders;
static PROC **waitOrderProcs; /* Space for waitOrders queue contents */
+
/* Current list of constraints being considered */
static EDGE *curConstraints;
-static int nCurConstraints;
-static int maxCurConstraints;
+static int nCurConstraints;
+static int maxCurConstraints;
+
/* Storage space for results from FindLockCycle */
static EDGE *possibleConstraints;
-static int nPossibleConstraints;
-static int maxPossibleConstraints;
+static int nPossibleConstraints;
+static int maxPossibleConstraints;
/*
* InitDeadLockChecking -- initialize deadlock checker during backend startup
*
* This does per-backend initialization of the deadlock checker; primarily,
- * allocation of working memory for DeadLockCheck. We do this per-backend
+ * allocation of working memory for DeadLockCheck. We do this per-backend
* since there's no percentage in making the kernel do copy-on-write
* inheritance of workspace from the postmaster. We want to allocate the
* space at startup because the deadlock checker might be invoked when there's
@@ -96,7 +104,7 @@ static int maxPossibleConstraints;
void
InitDeadLockChecking(void)
{
- MemoryContext oldcxt;
+ MemoryContext oldcxt;
/* Make sure allocations are permanent */
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
@@ -116,20 +124,21 @@ InitDeadLockChecking(void)
/*
* We need to consider rearranging at most MaxBackends/2 wait queues
- * (since it takes at least two waiters in a queue to create a soft edge),
- * and the expanded form of the wait queues can't involve more than
- * MaxBackends total waiters.
+ * (since it takes at least two waiters in a queue to create a soft
+ * edge), and the expanded form of the wait queues can't involve more
+ * than MaxBackends total waiters.
*/
- waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER));
+ waitOrders = (WAIT_ORDER *) palloc((MaxBackends / 2) * sizeof(WAIT_ORDER));
waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
/*
* Allow at most MaxBackends distinct constraints in a configuration.
- * (Is this enough? In practice it seems it should be, but I don't quite
- * see how to prove it. If we run out, we might fail to find a workable
- * wait queue rearrangement even though one exists.) NOTE that this
- * number limits the maximum recursion depth of DeadLockCheckRecurse.
- * Making it really big might potentially allow a stack-overflow problem.
+ * (Is this enough? In practice it seems it should be, but I don't
+ * quite see how to prove it. If we run out, we might fail to find a
+ * workable wait queue rearrangement even though one exists.) NOTE
+ * that this number limits the maximum recursion depth of
+ * DeadLockCheckRecurse. Making it really big might potentially allow
+ * a stack-overflow problem.
*/
maxCurConstraints = MaxBackends;
curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
@@ -139,8 +148,8 @@ InitDeadLockChecking(void)
* re-run TestConfiguration. (This is probably more than enough, but
* we can survive if we run low on space by doing excess runs of
* TestConfiguration to re-compute constraint lists each time needed.)
- * The last MaxBackends entries in possibleConstraints[] are reserved as
- * output workspace for FindLockCycle.
+ * The last MaxBackends entries in possibleConstraints[] are reserved
+ * as output workspace for FindLockCycle.
*/
maxPossibleConstraints = MaxBackends * 4;
possibleConstraints =
@@ -185,9 +194,9 @@ DeadLockCheck(PROC *proc)
/* Apply any needed rearrangements of wait queues */
for (i = 0; i < nWaitOrders; i++)
{
- LOCK *lock = waitOrders[i].lock;
- PROC **procs = waitOrders[i].procs;
- int nProcs = waitOrders[i].nProcs;
+ LOCK *lock = waitOrders[i].lock;
+ PROC **procs = waitOrders[i].procs;
+ int nProcs = waitOrders[i].nProcs;
PROC_QUEUE *waitQueue = &(lock->waitProcs);
Assert(nProcs == waitQueue->size);
@@ -218,10 +227,10 @@ DeadLockCheck(PROC *proc)
* DeadLockCheckRecurse -- recursively search for valid orderings
*
* curConstraints[] holds the current set of constraints being considered
- * by an outer level of recursion. Add to this each possible solution
+ * by an outer level of recursion. Add to this each possible solution
* constraint for any cycle detected at this level.
*
- * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free
+ * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free
* state is attainable, in which case waitOrders[] shows the required
* rearrangements of lock wait queues (if any).
*/
@@ -252,6 +261,7 @@ DeadLockCheckRecurse(PROC *proc)
/* Not room; will need to regenerate the edges on-the-fly */
savedList = false;
}
+
/*
* Try each available soft edge as an addition to the configuration.
*/
@@ -264,7 +274,7 @@ DeadLockCheckRecurse(PROC *proc)
elog(FATAL, "DeadLockCheckRecurse: inconsistent results");
}
curConstraints[nCurConstraints] =
- possibleConstraints[oldPossibleConstraints+i];
+ possibleConstraints[oldPossibleConstraints + i];
nCurConstraints++;
if (!DeadLockCheckRecurse(proc))
return false; /* found a valid solution! */
@@ -293,25 +303,27 @@ DeadLockCheckRecurse(PROC *proc)
static bool
TestConfiguration(PROC *startProc)
{
- int softFound = 0;
- EDGE *softEdges = possibleConstraints + nPossibleConstraints;
- int nSoftEdges;
- int i;
+ int softFound = 0;
+ EDGE *softEdges = possibleConstraints + nPossibleConstraints;
+ int nSoftEdges;
+ int i;
/*
* Make sure we have room for FindLockCycle's output.
*/
if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
return -1;
+
/*
* Expand current constraint set into wait orderings. Fail if the
* constraint set is not self-consistent.
*/
if (!ExpandConstraints(curConstraints, nCurConstraints))
return -1;
+
/*
* Check for cycles involving startProc or any of the procs mentioned
- * in constraints. We check startProc last because if it has a soft
+ * in constraints. We check startProc last because if it has a soft
* cycle still to be dealt with, we want to deal with that first.
*/
for (i = 0; i < nCurConstraints; i++)
@@ -350,7 +362,7 @@ TestConfiguration(PROC *startProc)
*
* Since we need to be able to check hypothetical configurations that would
* exist after wait queue rearrangement, the routine pays attention to the
- * table of hypothetical queue orders in waitOrders[]. These orders will
+ * table of hypothetical queue orders in waitOrders[]. These orders will
* be believed in preference to the actual ordering seen in the locktable.
*/
static bool
@@ -391,9 +403,10 @@ FindLockCycleRecurse(PROC *checkProc,
/* If we return to starting point, we have a deadlock cycle */
if (i == 0)
return true;
+
/*
- * Otherwise, we have a cycle but it does not include the start
- * point, so say "no deadlock".
+ * Otherwise, we have a cycle but it does not include the
+ * start point, so say "no deadlock".
*/
return false;
}
@@ -401,6 +414,7 @@ FindLockCycleRecurse(PROC *checkProc,
/* Mark proc as seen */
Assert(nVisitedProcs < MaxBackends);
visitedProcs[nVisitedProcs++] = checkProc;
+
/*
* If the proc is not waiting, we have no outgoing waits-for edges.
*/
@@ -413,8 +427,9 @@ FindLockCycleRecurse(PROC *checkProc,
lockctl = lockMethodTable->ctl;
numLockModes = lockctl->numLockModes;
conflictMask = lockctl->conflictTab[checkProc->waitLockMode];
+
/*
- * Scan for procs that already hold conflicting locks. These are
+ * Scan for procs that already hold conflicting locks. These are
* "hard" edges in the waits-for graph.
*/
lockHolders = &(lock->lockHolders);
@@ -449,12 +464,13 @@ FindLockCycleRecurse(PROC *checkProc,
/*
* Scan for procs that are ahead of this one in the lock's wait queue.
- * Those that have conflicting requests soft-block this one. This must
- * be done after the hard-block search, since if another proc both
- * hard- and soft-blocks this one, we want to call it a hard edge.
+ * Those that have conflicting requests soft-block this one. This
+ * must be done after the hard-block search, since if another proc
+ * both hard- and soft-blocks this one, we want to call it a hard
+ * edge.
*
- * If there is a proposed re-ordering of the lock's wait order,
- * use that rather than the current wait order.
+ * If there is a proposed re-ordering of the lock's wait order, use that
+ * rather than the current wait order.
*/
for (i = 0; i < nWaitOrders; i++)
{
@@ -465,7 +481,7 @@ FindLockCycleRecurse(PROC *checkProc,
if (i < nWaitOrders)
{
/* Use the given hypothetical wait queue order */
- PROC **procs = waitOrders[i].procs;
+ PROC **procs = waitOrders[i].procs;
queue_size = waitOrders[i].nProcs;
@@ -483,7 +499,11 @@ FindLockCycleRecurse(PROC *checkProc,
/* This proc soft-blocks checkProc */
if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
{
- /* Add this edge to the list of soft edges in the cycle */
+
+ /*
+ * Add this edge to the list of soft edges in the
+ * cycle
+ */
Assert(*nSoftEdges < MaxBackends);
softEdges[*nSoftEdges].waiter = checkProc;
softEdges[*nSoftEdges].blocker = proc;
@@ -513,7 +533,11 @@ FindLockCycleRecurse(PROC *checkProc,
/* This proc soft-blocks checkProc */
if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
{
- /* Add this edge to the list of soft edges in the cycle */
+
+ /*
+ * Add this edge to the list of soft edges in the
+ * cycle
+ */
Assert(*nSoftEdges < MaxBackends);
softEdges[*nSoftEdges].waiter = checkProc;
softEdges[*nSoftEdges].blocker = proc;
@@ -553,18 +577,19 @@ ExpandConstraints(EDGE *constraints,
j;
nWaitOrders = 0;
+
/*
- * Scan constraint list backwards. This is because the last-added
+ * Scan constraint list backwards. This is because the last-added
* constraint is the only one that could fail, and so we want to test
* it for inconsistency first.
*/
- for (i = nConstraints; --i >= 0; )
+ for (i = nConstraints; --i >= 0;)
{
- PROC *proc = constraints[i].waiter;
- LOCK *lock = proc->waitLock;
+ PROC *proc = constraints[i].waiter;
+ LOCK *lock = proc->waitLock;
/* Did we already make a list for this lock? */
- for (j = nWaitOrders; --j >= 0; )
+ for (j = nWaitOrders; --j >= 0;)
{
if (waitOrders[j].lock == lock)
break;
@@ -577,11 +602,12 @@ ExpandConstraints(EDGE *constraints,
waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
nWaitOrderProcs += lock->waitProcs.size;
Assert(nWaitOrderProcs <= MaxBackends);
+
/*
* Do the topo sort. TopoSort need not examine constraints after
* this one, since they must be for different locks.
*/
- if (!TopoSort(lock, constraints, i+1,
+ if (!TopoSort(lock, constraints, i + 1,
waitOrders[nWaitOrders].procs))
return false;
nWaitOrders++;
@@ -607,7 +633,7 @@ ExpandConstraints(EDGE *constraints,
* The initial queue ordering is taken directly from the lock's wait queue.
* The output is an array of PROC pointers, of length equal to the lock's
* wait queue length (the caller is responsible for providing this space).
- * The partial order is specified by an array of EDGE structs. Each EDGE
+ * The partial order is specified by an array of EDGE structs. Each EDGE
* is one that we need to reverse, therefore the "waiter" must appear before
* the "blocker" in the output array. The EDGE array may well contain
* edges associated with other locks; these should be ignored.
@@ -638,14 +664,15 @@ TopoSort(LOCK *lock,
}
/*
- * Scan the constraints, and for each proc in the array, generate a count
- * of the number of constraints that say it must be before something else,
- * plus a list of the constraints that say it must be after something else.
- * The count for the j'th proc is stored in beforeConstraints[j], and the
- * head of its list in afterConstraints[j]. Each constraint stores its
- * list link in constraints[i].link (note any constraint will be in
- * just one list). The array index for the before-proc of the i'th
- * constraint is remembered in constraints[i].pred.
+ * Scan the constraints, and for each proc in the array, generate a
+ * count of the number of constraints that say it must be before
+ * something else, plus a list of the constraints that say it must be
+ * after something else. The count for the j'th proc is stored in
+ * beforeConstraints[j], and the head of its list in
+ * afterConstraints[j]. Each constraint stores its list link in
+ * constraints[i].link (note any constraint will be in just one list).
+ * The array index for the before-proc of the i'th constraint is
+ * remembered in constraints[i].pred.
*/
MemSet(beforeConstraints, 0, queue_size * sizeof(int));
MemSet(afterConstraints, 0, queue_size * sizeof(int));
@@ -656,7 +683,7 @@ TopoSort(LOCK *lock,
if (proc->waitLock != lock)
continue;
/* Find the waiter proc in the array */
- for (j = queue_size; --j >= 0; )
+ for (j = queue_size; --j >= 0;)
{
if (topoProcs[j] == proc)
break;
@@ -664,20 +691,20 @@ TopoSort(LOCK *lock,
Assert(j >= 0); /* should have found a match */
/* Find the blocker proc in the array */
proc = constraints[i].blocker;
- for (k = queue_size; --k >= 0; )
+ for (k = queue_size; --k >= 0;)
{
if (topoProcs[k] == proc)
break;
}
Assert(k >= 0); /* should have found a match */
- beforeConstraints[j]++; /* waiter must come before */
+ beforeConstraints[j]++; /* waiter must come before */
/* add this constraint to list of after-constraints for blocker */
constraints[i].pred = j;
constraints[i].link = afterConstraints[k];
- afterConstraints[k] = i+1;
+ afterConstraints[k] = i + 1;
}
/*--------------------
- * Now scan the topoProcs array backwards. At each step, output the
+ * Now scan the topoProcs array backwards. At each step, output the
* last proc that has no remaining before-constraints, and decrease
* the beforeConstraints count of each of the procs it was constrained
* against.
@@ -687,8 +714,8 @@ TopoSort(LOCK *lock,
* last = last non-null index in topoProcs (avoid redundant searches)
*--------------------
*/
- last = queue_size-1;
- for (i = queue_size; --i >= 0; )
+ last = queue_size - 1;
+ for (i = queue_size; --i >= 0;)
{
/* Find next candidate to output */
while (topoProcs[last] == NULL)
@@ -705,10 +732,8 @@ TopoSort(LOCK *lock,
ordering[i] = topoProcs[j];
topoProcs[j] = NULL;
/* Update beforeConstraints counts of its predecessors */
- for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link)
- {
- beforeConstraints[constraints[k-1].pred]--;
- }
+ for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link)
+ beforeConstraints[constraints[k - 1].pred]--;
}
/* Done */
@@ -734,4 +759,5 @@ PrintLockQueue(LOCK *lock, const char *info)
printf("\n");
fflush(stdout);
}
+
#endif