diff options
Diffstat (limited to 'src/backend/storage/lmgr/deadlock.c')
-rw-r--r-- | src/backend/storage/lmgr/deadlock.c | 188 |
1 files changed, 107 insertions, 81 deletions
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index 31db44e74b..160fc64fb2 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -12,7 +12,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.2 2001/01/25 03:45:50 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.3 2001/03/22 03:59:46 momjian Exp $ * * Interface: * @@ -29,32 +29,36 @@ /* One edge in the waits-for graph */ -typedef struct { - PROC *waiter; /* the waiting process */ - PROC *blocker; /* the process it is waiting for */ - int pred; /* workspace for TopoSort */ - int link; /* workspace for TopoSort */ +typedef struct +{ + PROC *waiter; /* the waiting process */ + PROC *blocker; /* the process it is waiting for */ + int pred; /* workspace for TopoSort */ + int link; /* workspace for TopoSort */ } EDGE; /* One potential reordering of a lock's wait queue */ -typedef struct { - LOCK *lock; /* the lock whose wait queue is described */ - PROC **procs; /* array of PROC *'s in new wait order */ - int nProcs; +typedef struct +{ + LOCK *lock; /* the lock whose wait queue is described */ + PROC **procs; /* array of PROC *'s in new wait order */ + int nProcs; } WAIT_ORDER; static bool DeadLockCheckRecurse(PROC *proc); static bool TestConfiguration(PROC *startProc); static bool FindLockCycle(PROC *checkProc, - EDGE *softEdges, int *nSoftEdges); + EDGE *softEdges, int *nSoftEdges); static bool FindLockCycleRecurse(PROC *checkProc, - EDGE *softEdges, int *nSoftEdges); + EDGE *softEdges, int *nSoftEdges); static bool ExpandConstraints(EDGE *constraints, int nConstraints); static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints, - PROC **ordering); + PROC **ordering); + #ifdef DEBUG_DEADLOCK static void PrintLockQueue(LOCK *lock, const char *info); + #endif @@ -64,30 +68,34 @@ static void PrintLockQueue(LOCK *lock, const char *info); /* Workspace for FindLockCycle */ static PROC **visitedProcs; /* Array of visited procs */ -static int nVisitedProcs; +static int nVisitedProcs; + /* Workspace for TopoSort */ static PROC **topoProcs; /* Array of not-yet-output procs */ static int *beforeConstraints; /* Counts of remaining before-constraints */ static int *afterConstraints; /* List head for after-constraints */ + /* Output area for ExpandConstraints */ static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */ -static int nWaitOrders; +static int nWaitOrders; static PROC **waitOrderProcs; /* Space for waitOrders queue contents */ + /* Current list of constraints being considered */ static EDGE *curConstraints; -static int nCurConstraints; -static int maxCurConstraints; +static int nCurConstraints; +static int maxCurConstraints; + /* Storage space for results from FindLockCycle */ static EDGE *possibleConstraints; -static int nPossibleConstraints; -static int maxPossibleConstraints; +static int nPossibleConstraints; +static int maxPossibleConstraints; /* * InitDeadLockChecking -- initialize deadlock checker during backend startup * * This does per-backend initialization of the deadlock checker; primarily, - * allocation of working memory for DeadLockCheck. We do this per-backend + * allocation of working memory for DeadLockCheck. We do this per-backend * since there's no percentage in making the kernel do copy-on-write * inheritance of workspace from the postmaster. We want to allocate the * space at startup because the deadlock checker might be invoked when there's @@ -96,7 +104,7 @@ static int maxPossibleConstraints; void InitDeadLockChecking(void) { - MemoryContext oldcxt; + MemoryContext oldcxt; /* Make sure allocations are permanent */ oldcxt = MemoryContextSwitchTo(TopMemoryContext); @@ -116,20 +124,21 @@ InitDeadLockChecking(void) /* * We need to consider rearranging at most MaxBackends/2 wait queues - * (since it takes at least two waiters in a queue to create a soft edge), - * and the expanded form of the wait queues can't involve more than - * MaxBackends total waiters. + * (since it takes at least two waiters in a queue to create a soft + * edge), and the expanded form of the wait queues can't involve more + * than MaxBackends total waiters. */ - waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER)); + waitOrders = (WAIT_ORDER *) palloc((MaxBackends / 2) * sizeof(WAIT_ORDER)); waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *)); /* * Allow at most MaxBackends distinct constraints in a configuration. - * (Is this enough? In practice it seems it should be, but I don't quite - * see how to prove it. If we run out, we might fail to find a workable - * wait queue rearrangement even though one exists.) NOTE that this - * number limits the maximum recursion depth of DeadLockCheckRecurse. - * Making it really big might potentially allow a stack-overflow problem. + * (Is this enough? In practice it seems it should be, but I don't + * quite see how to prove it. If we run out, we might fail to find a + * workable wait queue rearrangement even though one exists.) NOTE + * that this number limits the maximum recursion depth of + * DeadLockCheckRecurse. Making it really big might potentially allow + * a stack-overflow problem. */ maxCurConstraints = MaxBackends; curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE)); @@ -139,8 +148,8 @@ InitDeadLockChecking(void) * re-run TestConfiguration. (This is probably more than enough, but * we can survive if we run low on space by doing excess runs of * TestConfiguration to re-compute constraint lists each time needed.) - * The last MaxBackends entries in possibleConstraints[] are reserved as - * output workspace for FindLockCycle. + * The last MaxBackends entries in possibleConstraints[] are reserved + * as output workspace for FindLockCycle. */ maxPossibleConstraints = MaxBackends * 4; possibleConstraints = @@ -185,9 +194,9 @@ DeadLockCheck(PROC *proc) /* Apply any needed rearrangements of wait queues */ for (i = 0; i < nWaitOrders; i++) { - LOCK *lock = waitOrders[i].lock; - PROC **procs = waitOrders[i].procs; - int nProcs = waitOrders[i].nProcs; + LOCK *lock = waitOrders[i].lock; + PROC **procs = waitOrders[i].procs; + int nProcs = waitOrders[i].nProcs; PROC_QUEUE *waitQueue = &(lock->waitProcs); Assert(nProcs == waitQueue->size); @@ -218,10 +227,10 @@ DeadLockCheck(PROC *proc) * DeadLockCheckRecurse -- recursively search for valid orderings * * curConstraints[] holds the current set of constraints being considered - * by an outer level of recursion. Add to this each possible solution + * by an outer level of recursion. Add to this each possible solution * constraint for any cycle detected at this level. * - * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free + * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free * state is attainable, in which case waitOrders[] shows the required * rearrangements of lock wait queues (if any). */ @@ -252,6 +261,7 @@ DeadLockCheckRecurse(PROC *proc) /* Not room; will need to regenerate the edges on-the-fly */ savedList = false; } + /* * Try each available soft edge as an addition to the configuration. */ @@ -264,7 +274,7 @@ DeadLockCheckRecurse(PROC *proc) elog(FATAL, "DeadLockCheckRecurse: inconsistent results"); } curConstraints[nCurConstraints] = - possibleConstraints[oldPossibleConstraints+i]; + possibleConstraints[oldPossibleConstraints + i]; nCurConstraints++; if (!DeadLockCheckRecurse(proc)) return false; /* found a valid solution! */ @@ -293,25 +303,27 @@ DeadLockCheckRecurse(PROC *proc) static bool TestConfiguration(PROC *startProc) { - int softFound = 0; - EDGE *softEdges = possibleConstraints + nPossibleConstraints; - int nSoftEdges; - int i; + int softFound = 0; + EDGE *softEdges = possibleConstraints + nPossibleConstraints; + int nSoftEdges; + int i; /* * Make sure we have room for FindLockCycle's output. */ if (nPossibleConstraints + MaxBackends > maxPossibleConstraints) return -1; + /* * Expand current constraint set into wait orderings. Fail if the * constraint set is not self-consistent. */ if (!ExpandConstraints(curConstraints, nCurConstraints)) return -1; + /* * Check for cycles involving startProc or any of the procs mentioned - * in constraints. We check startProc last because if it has a soft + * in constraints. We check startProc last because if it has a soft * cycle still to be dealt with, we want to deal with that first. */ for (i = 0; i < nCurConstraints; i++) @@ -350,7 +362,7 @@ TestConfiguration(PROC *startProc) * * Since we need to be able to check hypothetical configurations that would * exist after wait queue rearrangement, the routine pays attention to the - * table of hypothetical queue orders in waitOrders[]. These orders will + * table of hypothetical queue orders in waitOrders[]. These orders will * be believed in preference to the actual ordering seen in the locktable. */ static bool @@ -391,9 +403,10 @@ FindLockCycleRecurse(PROC *checkProc, /* If we return to starting point, we have a deadlock cycle */ if (i == 0) return true; + /* - * Otherwise, we have a cycle but it does not include the start - * point, so say "no deadlock". + * Otherwise, we have a cycle but it does not include the + * start point, so say "no deadlock". */ return false; } @@ -401,6 +414,7 @@ FindLockCycleRecurse(PROC *checkProc, /* Mark proc as seen */ Assert(nVisitedProcs < MaxBackends); visitedProcs[nVisitedProcs++] = checkProc; + /* * If the proc is not waiting, we have no outgoing waits-for edges. */ @@ -413,8 +427,9 @@ FindLockCycleRecurse(PROC *checkProc, lockctl = lockMethodTable->ctl; numLockModes = lockctl->numLockModes; conflictMask = lockctl->conflictTab[checkProc->waitLockMode]; + /* - * Scan for procs that already hold conflicting locks. These are + * Scan for procs that already hold conflicting locks. These are * "hard" edges in the waits-for graph. */ lockHolders = &(lock->lockHolders); @@ -449,12 +464,13 @@ FindLockCycleRecurse(PROC *checkProc, /* * Scan for procs that are ahead of this one in the lock's wait queue. - * Those that have conflicting requests soft-block this one. This must - * be done after the hard-block search, since if another proc both - * hard- and soft-blocks this one, we want to call it a hard edge. + * Those that have conflicting requests soft-block this one. This + * must be done after the hard-block search, since if another proc + * both hard- and soft-blocks this one, we want to call it a hard + * edge. * - * If there is a proposed re-ordering of the lock's wait order, - * use that rather than the current wait order. + * If there is a proposed re-ordering of the lock's wait order, use that + * rather than the current wait order. */ for (i = 0; i < nWaitOrders; i++) { @@ -465,7 +481,7 @@ FindLockCycleRecurse(PROC *checkProc, if (i < nWaitOrders) { /* Use the given hypothetical wait queue order */ - PROC **procs = waitOrders[i].procs; + PROC **procs = waitOrders[i].procs; queue_size = waitOrders[i].nProcs; @@ -483,7 +499,11 @@ FindLockCycleRecurse(PROC *checkProc, /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) { - /* Add this edge to the list of soft edges in the cycle */ + + /* + * Add this edge to the list of soft edges in the + * cycle + */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; @@ -513,7 +533,11 @@ FindLockCycleRecurse(PROC *checkProc, /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) { - /* Add this edge to the list of soft edges in the cycle */ + + /* + * Add this edge to the list of soft edges in the + * cycle + */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; @@ -553,18 +577,19 @@ ExpandConstraints(EDGE *constraints, j; nWaitOrders = 0; + /* - * Scan constraint list backwards. This is because the last-added + * Scan constraint list backwards. This is because the last-added * constraint is the only one that could fail, and so we want to test * it for inconsistency first. */ - for (i = nConstraints; --i >= 0; ) + for (i = nConstraints; --i >= 0;) { - PROC *proc = constraints[i].waiter; - LOCK *lock = proc->waitLock; + PROC *proc = constraints[i].waiter; + LOCK *lock = proc->waitLock; /* Did we already make a list for this lock? */ - for (j = nWaitOrders; --j >= 0; ) + for (j = nWaitOrders; --j >= 0;) { if (waitOrders[j].lock == lock) break; @@ -577,11 +602,12 @@ ExpandConstraints(EDGE *constraints, waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; nWaitOrderProcs += lock->waitProcs.size; Assert(nWaitOrderProcs <= MaxBackends); + /* * Do the topo sort. TopoSort need not examine constraints after * this one, since they must be for different locks. */ - if (!TopoSort(lock, constraints, i+1, + if (!TopoSort(lock, constraints, i + 1, waitOrders[nWaitOrders].procs)) return false; nWaitOrders++; @@ -607,7 +633,7 @@ ExpandConstraints(EDGE *constraints, * The initial queue ordering is taken directly from the lock's wait queue. * The output is an array of PROC pointers, of length equal to the lock's * wait queue length (the caller is responsible for providing this space). - * The partial order is specified by an array of EDGE structs. Each EDGE + * The partial order is specified by an array of EDGE structs. Each EDGE * is one that we need to reverse, therefore the "waiter" must appear before * the "blocker" in the output array. The EDGE array may well contain * edges associated with other locks; these should be ignored. @@ -638,14 +664,15 @@ TopoSort(LOCK *lock, } /* - * Scan the constraints, and for each proc in the array, generate a count - * of the number of constraints that say it must be before something else, - * plus a list of the constraints that say it must be after something else. - * The count for the j'th proc is stored in beforeConstraints[j], and the - * head of its list in afterConstraints[j]. Each constraint stores its - * list link in constraints[i].link (note any constraint will be in - * just one list). The array index for the before-proc of the i'th - * constraint is remembered in constraints[i].pred. + * Scan the constraints, and for each proc in the array, generate a + * count of the number of constraints that say it must be before + * something else, plus a list of the constraints that say it must be + * after something else. The count for the j'th proc is stored in + * beforeConstraints[j], and the head of its list in + * afterConstraints[j]. Each constraint stores its list link in + * constraints[i].link (note any constraint will be in just one list). + * The array index for the before-proc of the i'th constraint is + * remembered in constraints[i].pred. */ MemSet(beforeConstraints, 0, queue_size * sizeof(int)); MemSet(afterConstraints, 0, queue_size * sizeof(int)); @@ -656,7 +683,7 @@ TopoSort(LOCK *lock, if (proc->waitLock != lock) continue; /* Find the waiter proc in the array */ - for (j = queue_size; --j >= 0; ) + for (j = queue_size; --j >= 0;) { if (topoProcs[j] == proc) break; @@ -664,20 +691,20 @@ TopoSort(LOCK *lock, Assert(j >= 0); /* should have found a match */ /* Find the blocker proc in the array */ proc = constraints[i].blocker; - for (k = queue_size; --k >= 0; ) + for (k = queue_size; --k >= 0;) { if (topoProcs[k] == proc) break; } Assert(k >= 0); /* should have found a match */ - beforeConstraints[j]++; /* waiter must come before */ + beforeConstraints[j]++; /* waiter must come before */ /* add this constraint to list of after-constraints for blocker */ constraints[i].pred = j; constraints[i].link = afterConstraints[k]; - afterConstraints[k] = i+1; + afterConstraints[k] = i + 1; } /*-------------------- - * Now scan the topoProcs array backwards. At each step, output the + * Now scan the topoProcs array backwards. At each step, output the * last proc that has no remaining before-constraints, and decrease * the beforeConstraints count of each of the procs it was constrained * against. @@ -687,8 +714,8 @@ TopoSort(LOCK *lock, * last = last non-null index in topoProcs (avoid redundant searches) *-------------------- */ - last = queue_size-1; - for (i = queue_size; --i >= 0; ) + last = queue_size - 1; + for (i = queue_size; --i >= 0;) { /* Find next candidate to output */ while (topoProcs[last] == NULL) @@ -705,10 +732,8 @@ TopoSort(LOCK *lock, ordering[i] = topoProcs[j]; topoProcs[j] = NULL; /* Update beforeConstraints counts of its predecessors */ - for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link) - { - beforeConstraints[constraints[k-1].pred]--; - } + for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link) + beforeConstraints[constraints[k - 1].pred]--; } /* Done */ @@ -734,4 +759,5 @@ PrintLockQueue(LOCK *lock, const char *info) printf("\n"); fflush(stdout); } + #endif |