diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
| -rw-r--r-- | src/backend/storage/lmgr/lock.c | 365 |
1 files changed, 187 insertions, 178 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index e7d1b678be..fa2b98cc46 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.77 2001/01/14 05:08:15 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -43,8 +43,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, LOCK *lock, HOLDER *holder); static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, - int *myHolders); -static int LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc); + int *myHolding); +static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc); static char *lock_types[] = { @@ -91,7 +91,7 @@ LOCK_DEBUG_ENABLED(const LOCK * lock) return (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) - && (lock->tag.relId >= Trace_lock_oidmin)) + && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); } @@ -101,17 +101,18 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type) { if (LOCK_DEBUG_ENABLED(lock)) elog(DEBUG, - "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) " - "hold(%d,%d,%d,%d,%d,%d,%d)=%d " - "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", + "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) " + "req(%d,%d,%d,%d,%d,%d,%d)=%d " + "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", where, MAKE_OFFSET(lock), lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId, - lock->tag.objId.blkno, lock->mask, - lock->holders[1], lock->holders[2], lock->holders[3], lock->holders[4], - lock->holders[5], lock->holders[6], lock->holders[7], lock->nHolding, - lock->activeHolders[1], lock->activeHolders[2], lock->activeHolders[3], - lock->activeHolders[4], lock->activeHolders[5], lock->activeHolders[6], - lock->activeHolders[7], lock->nActive, + lock->tag.objId.blkno, lock->grantMask, + lock->requested[1], lock->requested[2], lock->requested[3], + lock->requested[4], lock->requested[5], lock->requested[6], + lock->requested[7], lock->nRequested, + lock->granted[1], lock->granted[2], lock->granted[3], + lock->granted[4], lock->granted[5], lock->granted[6], + lock->granted[7], lock->nGranted, lock->waitProcs.size, lock_types[type]); } @@ -122,7 +123,7 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP) if ( (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) - && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= Trace_lock_oidmin)) + && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) ) elog(DEBUG, @@ -130,8 +131,9 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP) where, MAKE_OFFSET(holderP), holderP->tag.lock, HOLDER_LOCKMETHOD(*(holderP)), holderP->tag.pid, holderP->tag.xid, - holderP->holders[1], holderP->holders[2], holderP->holders[3], holderP->holders[4], - holderP->holders[5], holderP->holders[6], holderP->holders[7], holderP->nHolding); + holderP->holding[1], holderP->holding[2], holderP->holding[3], + holderP->holding[4], holderP->holding[5], holderP->holding[6], + holderP->holding[7], holderP->nHolding); } #else /* not LOCK_DEBUG */ @@ -146,7 +148,12 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP) SPINLOCK LockMgrLock; /* in Shmem or created in * CreateSpinlocks() */ -/* This is to simplify/speed up some bit arithmetic */ +/* + * These are to simplify/speed up some bit arithmetic. + * + * XXX is a fetch from a static array really faster than a shift? + * Wouldn't bet on it... + */ static LOCKMASK BITS_OFF[MAX_LOCKMODES]; static LOCKMASK BITS_ON[MAX_LOCKMODES]; @@ -471,7 +478,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, SPINLOCK masterLock; LOCKMETHODTABLE *lockMethodTable; int status; - int myHolders[MAX_LOCKMODES]; + int myHolding[MAX_LOCKMODES]; int i; #ifdef LOCK_DEBUG @@ -517,20 +524,21 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, */ if (!found) { - lock->mask = 0; - lock->nHolding = 0; - lock->nActive = 0; - MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES); - MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES); + lock->grantMask = 0; + lock->waitMask = 0; + lock->nRequested = 0; + lock->nGranted = 0; + MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES); + MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES); ProcQueueInit(&(lock->waitProcs)); LOCK_PRINT("LockAcquire: new", lock, lockmode); } else { LOCK_PRINT("LockAcquire: found", lock, lockmode); - Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); - Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0)); - Assert(lock->nActive <= lock->nHolding); + Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); + Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); } /* ------------------ @@ -561,15 +569,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, if (!found) { holder->nHolding = 0; - MemSet((char *) holder->holders, 0, sizeof(int) * MAX_LOCKMODES); + MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES); ProcAddLock(&holder->queue); HOLDER_PRINT("LockAcquire: new", holder); } else { HOLDER_PRINT("LockAcquire: found", holder); - Assert((holder->nHolding > 0) && (holder->holders[lockmode] >= 0)); - Assert(holder->nHolding <= lock->nActive); + Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); + Assert(holder->nHolding <= lock->nGranted); #ifdef CHECK_DEADLOCK_RISK /* @@ -588,7 +596,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, */ for (i = lockMethodTable->ctl->numLockModes; i > 0; i--) { - if (holder->holders[i] > 0) + if (holder->holding[i] > 0) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ @@ -603,21 +611,21 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* ---------------- - * lock->nHolding and lock->holders count the total number of holders - * either holding or waiting for the lock, so increment those immediately. + * lock->nRequested and lock->requested[] count the total number of + * requests, whether granted or waiting, so increment those immediately. * The other counts don't increment till we get the lock. * ---------------- */ - lock->nHolding++; - lock->holders[lockmode]++; - Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0)); + lock->nRequested++; + lock->requested[lockmode]++; + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* -------------------- * If I'm the only one holding any lock on this object, then there * cannot be a conflict. The same is true if I already hold this lock. * -------------------- */ - if (holder->nHolding == lock->nActive || holder->holders[lockmode] != 0) + if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0) { GrantLock(lock, holder, lockmode); HOLDER_PRINT("LockAcquire: owning", holder); @@ -630,8 +638,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * then there is no conflict, either. * -------------------- */ - LockCountMyLocks(holder->tag.lock, MyProc, myHolders); - if (myHolders[lockmode] != 0) + LockCountMyLocks(holder->tag.lock, MyProc, myHolding); + if (myHolding[lockmode] != 0) { GrantLock(lock, holder, lockmode); HOLDER_PRINT("LockAcquire: my other XID owning", holder); @@ -650,7 +658,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, */ for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++) { - if (myHolders[i] > 0 && + if (myHolding[i] > 0 && lockMethodTable->ctl->conflictTab[i] & lock->waitMask) break; /* yes, there is a conflict */ } @@ -664,12 +672,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, else status = LockResolveConflicts(lockmethod, lockmode, lock, holder, - MyProc, myHolders); + MyProc, myHolding); } else status = LockResolveConflicts(lockmethod, lockmode, lock, holder, - MyProc, myHolders); + MyProc, myHolding); if (status == STATUS_OK) GrantLock(lock, holder, lockmode); @@ -694,11 +702,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } else HOLDER_PRINT("LockAcquire: NHOLDING", holder); - lock->nHolding--; - lock->holders[lockmode]--; + lock->nRequested--; + lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode); - Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); - Assert(lock->nActive <= lock->nHolding); + Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); SpinRelease(masterLock); return FALSE; } @@ -708,17 +716,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * Construct bitmask of locks this process holds on this object. */ { - int holdLock = 0; + int heldLocks = 0; int tmpMask; for (i = 1, tmpMask = 2; i <= lockMethodTable->ctl->numLockModes; i++, tmpMask <<= 1) { - if (myHolders[i] > 0) - holdLock |= tmpMask; + if (myHolding[i] > 0) + heldLocks |= tmpMask; } - MyProc->holdLock = holdLock; + MyProc->heldLocks = heldLocks; } /* @@ -736,7 +744,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * Check the holder entry status, in case something in the ipc * communication doesn't work correctly. */ - if (!((holder->nHolding > 0) && (holder->holders[lockmode] > 0))) + if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0))) { HOLDER_PRINT("LockAcquire: INCONSISTENT", holder); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); @@ -763,7 +771,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * determining whether or not any new lock acquired conflicts with * the old ones. * - * The caller can optionally pass the process's total holders counts, if + * The caller can optionally pass the process's total holding counts, if * known. If NULL is passed then these values will be computed internally. * ---------------------------- */ @@ -773,28 +781,28 @@ LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, HOLDER *holder, PROC *proc, - int *myHolders) /* myHolders[] array or NULL */ + int *myHolding) /* myHolding[] array or NULL */ { LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl; int numLockModes = lockctl->numLockModes; int bitmask; int i, tmpMask; - int localHolders[MAX_LOCKMODES]; + int localHolding[MAX_LOCKMODES]; - Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0)); + Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); /* ---------------------------- * first check for global conflicts: If no locks conflict * with mine, then I get the lock. * - * Checking for conflict: lock->mask represents the types of + * Checking for conflict: lock->grantMask represents the types of * currently held locks. conflictTable[lockmode] has a bit * set for each type of lock that conflicts with mine. Bitwise * compare tells if there is a conflict. * ---------------------------- */ - if (!(lockctl->conflictTab[lockmode] & lock->mask)) + if (!(lockctl->conflictTab[lockmode] & lock->grantMask)) { HOLDER_PRINT("LockResolveConflicts: no conflict", holder); return STATUS_OK; @@ -807,11 +815,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod, * process under another XID also count as "our own locks". * ------------------------ */ - if (myHolders == NULL) + if (myHolding == NULL) { /* Caller didn't do calculation of total holding for me */ - LockCountMyLocks(holder->tag.lock, proc, localHolders); - myHolders = localHolders; + LockCountMyLocks(holder->tag.lock, proc, localHolding); + myHolding = localHolding; } /* Compute mask of lock types held by other processes */ @@ -819,7 +827,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod, tmpMask = 2; for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) { - if (lock->activeHolders[i] != myHolders[i]) + if (lock->granted[i] != myHolding[i]) bitmask |= tmpMask; } @@ -852,20 +860,20 @@ LockResolveConflicts(LOCKMETHOD lockmethod, * be a net slowdown. */ static void -LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders) +LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding) { HOLDER *holder = NULL; HOLDER *nextHolder = NULL; - SHM_QUEUE *lockQueue = &(proc->lockQueue); - SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); + SHM_QUEUE *holderQueue = &(proc->holderQueue); + SHMEM_OFFSET end = MAKE_OFFSET(holderQueue); int i; - MemSet(myHolders, 0, MAX_LOCKMODES * sizeof(int)); + MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int)); - if (SHMQueueEmpty(lockQueue)) + if (SHMQueueEmpty(holderQueue)) return; - SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); + SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue); do { @@ -885,7 +893,7 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders) { for (i = 1; i < MAX_LOCKMODES; i++) { - myHolders[i] += holder->holders[i]; + myHolding[i] += holder->holding[i]; } } @@ -894,47 +902,47 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders) } /* - * LockGetMyHoldLocks -- compute bitmask of lock types held by a process + * LockGetMyHeldLocks -- compute bitmask of lock types held by a process * for a given lockable object. */ static int -LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc) +LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc) { - int myHolders[MAX_LOCKMODES]; - int holdLock = 0; + int myHolding[MAX_LOCKMODES]; + int heldLocks = 0; int i, tmpMask; - LockCountMyLocks(lockOffset, proc, myHolders); + LockCountMyLocks(lockOffset, proc, myHolding); for (i = 1, tmpMask = 2; i < MAX_LOCKMODES; i++, tmpMask <<= 1) { - if (myHolders[i] > 0) - holdLock |= tmpMask; + if (myHolding[i] > 0) + heldLocks |= tmpMask; } - return holdLock; + return heldLocks; } /* * GrantLock -- update the lock and holder data structures to show - * the new lock has been granted. + * the lock request has been granted. */ void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode) { - lock->nActive++; - lock->activeHolders[lockmode]++; - lock->mask |= BITS_ON[lockmode]; - if (lock->activeHolders[lockmode] == lock->holders[lockmode]) + lock->nGranted++; + lock->granted[lockmode]++; + lock->grantMask |= BITS_ON[lockmode]; + if (lock->granted[lockmode] == lock->requested[lockmode]) lock->waitMask &= BITS_OFF[lockmode]; LOCK_PRINT("GrantLock", lock, lockmode); - Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0)); - Assert(lock->nActive <= lock->nHolding); - holder->holders[lockmode]++; + Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); + Assert(lock->nGranted <= lock->nRequested); + holder->holding[lockmode]++; holder->nHolding++; - Assert((holder->nHolding > 0) && (holder->holders[lockmode] > 0)); + Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0)); } /* @@ -952,14 +960,6 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, Assert(lockmethod < NumLockMethods); - /* - * the waitqueue is ordered by priority. I insert myself according to - * the priority of the lock I am acquiring. - * - * SYNC NOTE: I am assuming that the lock table spinlock is sufficient - * synchronization for this queue. That will not be true if/when - * people can be deleted from the queue by a SIGINT or something. - */ LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode); old_status = pstrdup(get_ps_display()); @@ -971,7 +971,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, /* * NOTE: Think not to put any lock state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state - * must be fully set by the lock grantor, or by HandleDeadlock if we + * must be fully set by the lock grantor, or by HandleDeadLock if we * give up waiting for the lock. This is necessary because of the * possibility that a cancel/die interrupt will interrupt ProcSleep * after someone else grants us the lock, but before we've noticed it. @@ -1074,9 +1074,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, return FALSE; } LOCK_PRINT("LockRelease: found", lock, lockmode); - Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); - Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0)); - Assert(lock->nActive <= lock->nHolding); /* * Find the holder entry for this holder. @@ -1107,29 +1104,32 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, * Check that we are actually holding a lock of the type we want to * release. */ - if (!(holder->holders[lockmode] > 0)) + if (!(holder->holding[lockmode] > 0)) { SpinRelease(masterLock); HOLDER_PRINT("LockRelease: WRONGTYPE", holder); elog(NOTICE, "LockRelease: you don't own a lock of type %s", lock_types[lockmode]); - Assert(holder->holders[lockmode] >= 0); + Assert(holder->holding[lockmode] >= 0); return FALSE; } Assert(holder->nHolding > 0); + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); + Assert(lock->nGranted <= lock->nRequested); /* * fix the general lock stats */ - lock->nHolding--; - lock->holders[lockmode]--; - lock->nActive--; - lock->activeHolders[lockmode]--; + lock->nRequested--; + lock->requested[lockmode]--; + lock->nGranted--; + lock->granted[lockmode]--; - if (!(lock->activeHolders[lockmode])) + if (lock->granted[lockmode] == 0) { /* change the conflict mask. No more of this lock type. */ - lock->mask &= BITS_OFF[lockmode]; + lock->grantMask &= BITS_OFF[lockmode]; } #ifdef NOT_USED @@ -1139,14 +1139,14 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, * with the remaining locks. * -------------------------- */ - if (lock->activeHolders[lockmode]) + if (lock->granted[lockmode]) wakeupNeeded = false; else #endif /* * Above is not valid any more (due to MVCC lock modes). Actually - * we should compare activeHolders[lockmode] with number of + * we should compare granted[lockmode] with number of * waiters holding lock of this type and try to wakeup only if * these numbers are equal (and lock released conflicts with locks * requested by waiters). For the moment we only check the last @@ -1156,11 +1156,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, wakeupNeeded = true; LOCK_PRINT("LockRelease: updated", lock, lockmode); - Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0)); - Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0)); - Assert(lock->nActive <= lock->nHolding); + Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); + Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); - if (!lock->nHolding) + if (!lock->nRequested) { /* ------------------ * if there's no one waiting in the queue, @@ -1180,10 +1180,10 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * Now fix the per-holder lock stats. */ - holder->holders[lockmode]--; + holder->holding[lockmode]--; holder->nHolding--; HOLDER_PRINT("LockRelease: updated", holder); - Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0)); + Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); /* * If this was my last hold on this lock, delete my entry in the holder @@ -1236,8 +1236,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, { HOLDER *holder = NULL; HOLDER *nextHolder = NULL; - SHM_QUEUE *lockQueue = &(proc->lockQueue); - SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); + SHM_QUEUE *holderQueue = &(proc->holderQueue); + SHMEM_OFFSET end = MAKE_OFFSET(holderQueue); SPINLOCK masterLock; LOCKMETHODTABLE *lockMethodTable; int i, @@ -1260,7 +1260,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, return FALSE; } - if (SHMQueueEmpty(lockQueue)) + if (SHMQueueEmpty(holderQueue)) return TRUE; numLockModes = lockMethodTable->ctl->numLockModes; @@ -1268,7 +1268,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, SpinAcquire(masterLock); - SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); + SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue); nleft = 0; @@ -1308,53 +1308,55 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, HOLDER_PRINT("LockReleaseAll", holder); LOCK_PRINT("LockReleaseAll", lock, 0); - Assert(lock->nHolding > 0); - Assert(lock->nActive > 0); - Assert(lock->nActive <= lock->nHolding); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); Assert(holder->nHolding >= 0); - Assert(holder->nHolding <= lock->nHolding); + Assert(holder->nHolding <= lock->nRequested); /* ------------------ * fix the general lock stats * ------------------ */ - if (lock->nHolding != holder->nHolding) + if (lock->nRequested != holder->nHolding) { for (i = 1; i <= numLockModes; i++) { - Assert(holder->holders[i] >= 0); - lock->holders[i] -= holder->holders[i]; - lock->activeHolders[i] -= holder->holders[i]; - Assert((lock->holders[i] >= 0) \ - &&(lock->activeHolders[i] >= 0)); - if (!lock->activeHolders[i]) - lock->mask &= BITS_OFF[i]; - - /* - * Read comments in LockRelease - */ - if (!wakeupNeeded && holder->holders[i] > 0 && - lockMethodTable->ctl->conflictTab[i] & lock->waitMask) - wakeupNeeded = true; + Assert(holder->holding[i] >= 0); + if (holder->holding[i] > 0) + { + lock->requested[i] -= holder->holding[i]; + lock->granted[i] -= holder->holding[i]; + Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); + if (lock->granted[i] == 0) + lock->grantMask &= BITS_OFF[i]; + /* + * Read comments in LockRelease + */ + if (!wakeupNeeded && + lockMethodTable->ctl->conflictTab[i] & lock->waitMask) + wakeupNeeded = true; + } } - lock->nHolding -= holder->nHolding; - lock->nActive -= holder->nHolding; - Assert((lock->nHolding >= 0) && (lock->nActive >= 0)); - Assert(lock->nActive <= lock->nHolding); + lock->nRequested -= holder->nHolding; + lock->nGranted -= holder->nHolding; + Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); + Assert(lock->nGranted <= lock->nRequested); } else { /* -------------- - * set nHolding to zero so that we can garbage collect the lock + * set nRequested to zero so that we can garbage collect the lock * down below... * -------------- */ - lock->nHolding = 0; + lock->nRequested = 0; + lock->nGranted = 0; /* Fix the lock status, just for next LOCK_PRINT message. */ for (i = 1; i <= numLockModes; i++) { - Assert(lock->holders[i] == lock->activeHolders[i]); - lock->holders[i] = lock->activeHolders[i] = 0; + Assert(lock->requested[i] == lock->granted[i]); + lock->requested[i] = lock->granted[i] = 0; } } LOCK_PRINT("LockReleaseAll: updated", lock, 0); @@ -1380,11 +1382,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, return FALSE; } - if (!lock->nHolding) + if (!lock->nRequested) { /* -------------------- - * if there's no one waiting in the queue, we've just released - * the last lock. + * We've just released the last lock, so garbage-collect the + * lock object. * -------------------- */ LOCK_PRINT("LockReleaseAll: deleting", lock, 0); @@ -1413,12 +1415,13 @@ next_item: { #ifdef LOCK_DEBUG if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) - elog(DEBUG, "LockReleaseAll: reinitializing lockQueue"); + elog(DEBUG, "LockReleaseAll: reinitializing holderQueue"); #endif - SHMQueueInit(lockQueue); + SHMQueueInit(holderQueue); } SpinRelease(masterLock); + #ifdef LOCK_DEBUG if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) elog(DEBUG, "LockReleaseAll: done"); @@ -1457,16 +1460,17 @@ LockShmemSize(int maxBackends) } /* - * DeadlockCheck -- Checks for deadlocks for a given process - * - * We can't block on user locks, so no sense testing for deadlock - * because there is no blocking, and no timer for the block. + * DeadLockCheck -- Checks for deadlocks for a given process * * This code takes a list of locks a process holds, and the lock that * the process is sleeping on, and tries to find if any of the processes * waiting on its locks hold the lock it is waiting for. If no deadlock * is found, it goes on to look at all the processes waiting on their locks. * + * We can't block on user locks, so no sense testing for deadlock + * because there is no blocking, and no timer for the block. So, + * only look at regular locks. + * * We have already locked the master lock before being called. */ bool @@ -1476,8 +1480,8 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) HOLDER *nextHolder = NULL; PROC *waitProc; PROC_QUEUE *waitQueue; - SHM_QUEUE *lockQueue = &(thisProc->lockQueue); - SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); + SHM_QUEUE *holderQueue = &(thisProc->holderQueue); + SHMEM_OFFSET end = MAKE_OFFSET(holderQueue); LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; LOCK *lock; int i, @@ -1494,10 +1498,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) nprocs = 1; } - if (SHMQueueEmpty(lockQueue)) + /* + * Scan over all the locks held/awaited by thisProc. + */ + if (SHMQueueEmpty(holderQueue)) return false; - SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); + SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue); do { @@ -1525,7 +1532,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) LOCK_PRINT("DeadLockCheck", lock, 0); /* - * waitLock is always in lockQueue of waiting proc, if !first_run + * waitLock is always in holderQueue of waiting proc, if !first_run * then upper caller will handle waitProcs queue of waitLock. */ if (thisProc->waitLock == lock && !first_run) @@ -1542,13 +1549,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) Assert(holder->nHolding > 0); for (lm = 1; lm <= lockctl->numLockModes; lm++) { - if (holder->holders[lm] > 0 && + if (holder->holding[lm] > 0 && lockctl->conflictTab[lm] & findlock->waitMask) return true; } /* - * Else - get the next lock from thisProc's lockQueue + * Else - get the next lock from thisProc's holderQueue */ goto nxtl; } @@ -1557,6 +1564,8 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev); /* + * Inner loop scans over all processes waiting for this lock. + * * NOTE: loop must count down because we want to examine each item * in the queue even if waitQueue->size decreases due to waking up * some of the processes. @@ -1573,14 +1582,14 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) if (lock == findlock) /* first_run also true */ { /* - * If me blocked by his holdlock... + * If I'm blocked by his heldLocks... */ - if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->holdLock) + if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks) { /* and he blocked by me -> deadlock */ - if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) + if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) return true; - /* we shouldn't look at lockQueue of our blockers */ + /* we shouldn't look at holderQueue of our blockers */ goto nextWaitProc; } @@ -1591,11 +1600,11 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) * implicitly). Note that we don't do like test if * !first_run (when thisProc is holder and non-waiter on * lock) and so we call DeadLockCheck below for every - * waitProc in thisProc->lockQueue, even for waitProc-s + * waitProc in thisProc->holderQueue, even for waitProc-s * un-blocked by thisProc. Should we? This could save us * some time... */ - if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) && + if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) && !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode))) goto nextWaitProc; } @@ -1609,13 +1618,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) goto nextWaitProc; } - /* Recursively check this process's lockQueue. */ + /* Recursively check this process's holderQueue. */ Assert(nprocs < MAXBACKENDS); checked_procs[nprocs++] = waitProc; if (DeadLockCheck(waitProc, findlock)) { - int holdLock; + int heldLocks; /* * Ok, but is waitProc waiting for me (thisProc) ? @@ -1623,15 +1632,15 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock) if (thisProc->waitLock == lock) { Assert(first_run); - holdLock = thisProc->holdLock; + heldLocks = thisProc->heldLocks; } else { - /* should we cache holdLock to speed this up? */ - holdLock = LockGetMyHoldLocks(holder->tag.lock, thisProc); - Assert(holdLock != 0); + /* should we cache heldLocks to speed this up? */ + heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc); + Assert(heldLocks != 0); } - if (lockctl->conflictTab[waitProc->waitLockMode] & holdLock) + if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks) { /* * Last attempt to avoid deadlock: try to wakeup myself. @@ -1703,7 +1712,7 @@ nxtl: #ifdef LOCK_DEBUG /* - * Dump all locks in the proc->lockQueue. Must have already acquired + * Dump all locks in the proc->holderQueue. Must have already acquired * the masterLock. */ void @@ -1711,7 +1720,7 @@ DumpLocks(void) { SHMEM_OFFSET location; PROC *proc; - SHM_QUEUE *lockQueue; + SHM_QUEUE *holderQueue; HOLDER *holder = NULL; HOLDER *nextHolder = NULL; SHMEM_OFFSET end; @@ -1725,8 +1734,8 @@ DumpLocks(void) proc = (PROC *) MAKE_PTR(location); if (proc != MyProc) return; - lockQueue = &proc->lockQueue; - end = MAKE_OFFSET(lockQueue); + holderQueue = &proc->holderQueue; + end = MAKE_OFFSET(holderQueue); Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; @@ -1736,10 +1745,10 @@ DumpLocks(void) if (proc->waitLock) LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); - if (SHMQueueEmpty(lockQueue)) + if (SHMQueueEmpty(holderQueue)) return; - SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); + SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue); do { |
