summaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r--src/backend/storage/lmgr/lock.c365
1 files changed, 187 insertions, 178 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index e7d1b678be..fa2b98cc46 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.77 2001/01/14 05:08:15 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -43,8 +43,8 @@
static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder);
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
- int *myHolders);
-static int LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
+ int *myHolding);
+static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
static char *lock_types[] =
{
@@ -91,7 +91,7 @@ LOCK_DEBUG_ENABLED(const LOCK * lock)
return
(((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
|| (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
- && (lock->tag.relId >= Trace_lock_oidmin))
+ && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
|| (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
}
@@ -101,17 +101,18 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
{
if (LOCK_DEBUG_ENABLED(lock))
elog(DEBUG,
- "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) "
- "hold(%d,%d,%d,%d,%d,%d,%d)=%d "
- "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
+ "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
+ "req(%d,%d,%d,%d,%d,%d,%d)=%d "
+ "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
where, MAKE_OFFSET(lock),
lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
- lock->tag.objId.blkno, lock->mask,
- lock->holders[1], lock->holders[2], lock->holders[3], lock->holders[4],
- lock->holders[5], lock->holders[6], lock->holders[7], lock->nHolding,
- lock->activeHolders[1], lock->activeHolders[2], lock->activeHolders[3],
- lock->activeHolders[4], lock->activeHolders[5], lock->activeHolders[6],
- lock->activeHolders[7], lock->nActive,
+ lock->tag.objId.blkno, lock->grantMask,
+ lock->requested[1], lock->requested[2], lock->requested[3],
+ lock->requested[4], lock->requested[5], lock->requested[6],
+ lock->requested[7], lock->nRequested,
+ lock->granted[1], lock->granted[2], lock->granted[3],
+ lock->granted[4], lock->granted[5], lock->granted[6],
+ lock->granted[7], lock->nGranted,
lock->waitProcs.size, lock_types[type]);
}
@@ -122,7 +123,7 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
if (
(((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
|| (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
- && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= Trace_lock_oidmin))
+ && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
|| (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
)
elog(DEBUG,
@@ -130,8 +131,9 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
where, MAKE_OFFSET(holderP), holderP->tag.lock,
HOLDER_LOCKMETHOD(*(holderP)),
holderP->tag.pid, holderP->tag.xid,
- holderP->holders[1], holderP->holders[2], holderP->holders[3], holderP->holders[4],
- holderP->holders[5], holderP->holders[6], holderP->holders[7], holderP->nHolding);
+ holderP->holding[1], holderP->holding[2], holderP->holding[3],
+ holderP->holding[4], holderP->holding[5], holderP->holding[6],
+ holderP->holding[7], holderP->nHolding);
}
#else /* not LOCK_DEBUG */
@@ -146,7 +148,12 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
SPINLOCK LockMgrLock; /* in Shmem or created in
* CreateSpinlocks() */
-/* This is to simplify/speed up some bit arithmetic */
+/*
+ * These are to simplify/speed up some bit arithmetic.
+ *
+ * XXX is a fetch from a static array really faster than a shift?
+ * Wouldn't bet on it...
+ */
static LOCKMASK BITS_OFF[MAX_LOCKMODES];
static LOCKMASK BITS_ON[MAX_LOCKMODES];
@@ -471,7 +478,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int status;
- int myHolders[MAX_LOCKMODES];
+ int myHolding[MAX_LOCKMODES];
int i;
#ifdef LOCK_DEBUG
@@ -517,20 +524,21 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/
if (!found)
{
- lock->mask = 0;
- lock->nHolding = 0;
- lock->nActive = 0;
- MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
- MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
+ lock->grantMask = 0;
+ lock->waitMask = 0;
+ lock->nRequested = 0;
+ lock->nGranted = 0;
+ MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
+ MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
ProcQueueInit(&(lock->waitProcs));
LOCK_PRINT("LockAcquire: new", lock, lockmode);
}
else
{
LOCK_PRINT("LockAcquire: found", lock, lockmode);
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
- Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
+ Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+ Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
}
/* ------------------
@@ -561,15 +569,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
if (!found)
{
holder->nHolding = 0;
- MemSet((char *) holder->holders, 0, sizeof(int) * MAX_LOCKMODES);
+ MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&holder->queue);
HOLDER_PRINT("LockAcquire: new", holder);
}
else
{
HOLDER_PRINT("LockAcquire: found", holder);
- Assert((holder->nHolding > 0) && (holder->holders[lockmode] >= 0));
- Assert(holder->nHolding <= lock->nActive);
+ Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
+ Assert(holder->nHolding <= lock->nGranted);
#ifdef CHECK_DEADLOCK_RISK
/*
@@ -588,7 +596,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/
for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
{
- if (holder->holders[i] > 0)
+ if (holder->holding[i] > 0)
{
if (i >= (int) lockmode)
break; /* safe: we have a lock >= req level */
@@ -603,21 +611,21 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
}
/* ----------------
- * lock->nHolding and lock->holders count the total number of holders
- * either holding or waiting for the lock, so increment those immediately.
+ * lock->nRequested and lock->requested[] count the total number of
+ * requests, whether granted or waiting, so increment those immediately.
* The other counts don't increment till we get the lock.
* ----------------
*/
- lock->nHolding++;
- lock->holders[lockmode]++;
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
+ lock->nRequested++;
+ lock->requested[lockmode]++;
+ Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/* --------------------
* If I'm the only one holding any lock on this object, then there
* cannot be a conflict. The same is true if I already hold this lock.
* --------------------
*/
- if (holder->nHolding == lock->nActive || holder->holders[lockmode] != 0)
+ if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
{
GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: owning", holder);
@@ -630,8 +638,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* then there is no conflict, either.
* --------------------
*/
- LockCountMyLocks(holder->tag.lock, MyProc, myHolders);
- if (myHolders[lockmode] != 0)
+ LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
+ if (myHolding[lockmode] != 0)
{
GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: my other XID owning", holder);
@@ -650,7 +658,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/
for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
{
- if (myHolders[i] > 0 &&
+ if (myHolding[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
break; /* yes, there is a conflict */
}
@@ -664,12 +672,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
else
status = LockResolveConflicts(lockmethod, lockmode,
lock, holder,
- MyProc, myHolders);
+ MyProc, myHolding);
}
else
status = LockResolveConflicts(lockmethod, lockmode,
lock, holder,
- MyProc, myHolders);
+ MyProc, myHolding);
if (status == STATUS_OK)
GrantLock(lock, holder, lockmode);
@@ -694,11 +702,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
}
else
HOLDER_PRINT("LockAcquire: NHOLDING", holder);
- lock->nHolding--;
- lock->holders[lockmode]--;
+ lock->nRequested--;
+ lock->requested[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
+ Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
SpinRelease(masterLock);
return FALSE;
}
@@ -708,17 +716,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* Construct bitmask of locks this process holds on this object.
*/
{
- int holdLock = 0;
+ int heldLocks = 0;
int tmpMask;
for (i = 1, tmpMask = 2;
i <= lockMethodTable->ctl->numLockModes;
i++, tmpMask <<= 1)
{
- if (myHolders[i] > 0)
- holdLock |= tmpMask;
+ if (myHolding[i] > 0)
+ heldLocks |= tmpMask;
}
- MyProc->holdLock = holdLock;
+ MyProc->heldLocks = heldLocks;
}
/*
@@ -736,7 +744,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* Check the holder entry status, in case something in the ipc
* communication doesn't work correctly.
*/
- if (!((holder->nHolding > 0) && (holder->holders[lockmode] > 0)))
+ if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
{
HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
@@ -763,7 +771,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* determining whether or not any new lock acquired conflicts with
* the old ones.
*
- * The caller can optionally pass the process's total holders counts, if
+ * The caller can optionally pass the process's total holding counts, if
* known. If NULL is passed then these values will be computed internally.
* ----------------------------
*/
@@ -773,28 +781,28 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
LOCK *lock,
HOLDER *holder,
PROC *proc,
- int *myHolders) /* myHolders[] array or NULL */
+ int *myHolding) /* myHolding[] array or NULL */
{
LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
int numLockModes = lockctl->numLockModes;
int bitmask;
int i,
tmpMask;
- int localHolders[MAX_LOCKMODES];
+ int localHolding[MAX_LOCKMODES];
- Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
+ Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
/* ----------------------------
* first check for global conflicts: If no locks conflict
* with mine, then I get the lock.
*
- * Checking for conflict: lock->mask represents the types of
+ * Checking for conflict: lock->grantMask represents the types of
* currently held locks. conflictTable[lockmode] has a bit
* set for each type of lock that conflicts with mine. Bitwise
* compare tells if there is a conflict.
* ----------------------------
*/
- if (!(lockctl->conflictTab[lockmode] & lock->mask))
+ if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
{
HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
return STATUS_OK;
@@ -807,11 +815,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* process under another XID also count as "our own locks".
* ------------------------
*/
- if (myHolders == NULL)
+ if (myHolding == NULL)
{
/* Caller didn't do calculation of total holding for me */
- LockCountMyLocks(holder->tag.lock, proc, localHolders);
- myHolders = localHolders;
+ LockCountMyLocks(holder->tag.lock, proc, localHolding);
+ myHolding = localHolding;
}
/* Compute mask of lock types held by other processes */
@@ -819,7 +827,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
{
- if (lock->activeHolders[i] != myHolders[i])
+ if (lock->granted[i] != myHolding[i])
bitmask |= tmpMask;
}
@@ -852,20 +860,20 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* be a net slowdown.
*/
static void
-LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
+LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
{
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
- SHM_QUEUE *lockQueue = &(proc->lockQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+ SHM_QUEUE *holderQueue = &(proc->holderQueue);
+ SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
int i;
- MemSet(myHolders, 0, MAX_LOCKMODES * sizeof(int));
+ MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
- if (SHMQueueEmpty(lockQueue))
+ if (SHMQueueEmpty(holderQueue))
return;
- SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
+ SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do
{
@@ -885,7 +893,7 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
{
for (i = 1; i < MAX_LOCKMODES; i++)
{
- myHolders[i] += holder->holders[i];
+ myHolding[i] += holder->holding[i];
}
}
@@ -894,47 +902,47 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
}
/*
- * LockGetMyHoldLocks -- compute bitmask of lock types held by a process
+ * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
* for a given lockable object.
*/
static int
-LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
+LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
{
- int myHolders[MAX_LOCKMODES];
- int holdLock = 0;
+ int myHolding[MAX_LOCKMODES];
+ int heldLocks = 0;
int i,
tmpMask;
- LockCountMyLocks(lockOffset, proc, myHolders);
+ LockCountMyLocks(lockOffset, proc, myHolding);
for (i = 1, tmpMask = 2;
i < MAX_LOCKMODES;
i++, tmpMask <<= 1)
{
- if (myHolders[i] > 0)
- holdLock |= tmpMask;
+ if (myHolding[i] > 0)
+ heldLocks |= tmpMask;
}
- return holdLock;
+ return heldLocks;
}
/*
* GrantLock -- update the lock and holder data structures to show
- * the new lock has been granted.
+ * the lock request has been granted.
*/
void
GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
{
- lock->nActive++;
- lock->activeHolders[lockmode]++;
- lock->mask |= BITS_ON[lockmode];
- if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+ lock->nGranted++;
+ lock->granted[lockmode]++;
+ lock->grantMask |= BITS_ON[lockmode];
+ if (lock->granted[lockmode] == lock->requested[lockmode])
lock->waitMask &= BITS_OFF[lockmode];
LOCK_PRINT("GrantLock", lock, lockmode);
- Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
- Assert(lock->nActive <= lock->nHolding);
- holder->holders[lockmode]++;
+ Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
+ Assert(lock->nGranted <= lock->nRequested);
+ holder->holding[lockmode]++;
holder->nHolding++;
- Assert((holder->nHolding > 0) && (holder->holders[lockmode] > 0));
+ Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
}
/*
@@ -952,14 +960,6 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
Assert(lockmethod < NumLockMethods);
- /*
- * the waitqueue is ordered by priority. I insert myself according to
- * the priority of the lock I am acquiring.
- *
- * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
- * synchronization for this queue. That will not be true if/when
- * people can be deleted from the queue by a SIGINT or something.
- */
LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
old_status = pstrdup(get_ps_display());
@@ -971,7 +971,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
/*
* NOTE: Think not to put any lock state cleanup after the call to
* ProcSleep, in either the normal or failure path. The lock state
- * must be fully set by the lock grantor, or by HandleDeadlock if we
+ * must be fully set by the lock grantor, or by HandleDeadLock if we
* give up waiting for the lock. This is necessary because of the
* possibility that a cancel/die interrupt will interrupt ProcSleep
* after someone else grants us the lock, but before we've noticed it.
@@ -1074,9 +1074,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
return FALSE;
}
LOCK_PRINT("LockRelease: found", lock, lockmode);
- Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
- Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
/*
* Find the holder entry for this holder.
@@ -1107,29 +1104,32 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* Check that we are actually holding a lock of the type we want to
* release.
*/
- if (!(holder->holders[lockmode] > 0))
+ if (!(holder->holding[lockmode] > 0))
{
SpinRelease(masterLock);
HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
- Assert(holder->holders[lockmode] >= 0);
+ Assert(holder->holding[lockmode] >= 0);
return FALSE;
}
Assert(holder->nHolding > 0);
+ Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
+ Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
+ Assert(lock->nGranted <= lock->nRequested);
/*
* fix the general lock stats
*/
- lock->nHolding--;
- lock->holders[lockmode]--;
- lock->nActive--;
- lock->activeHolders[lockmode]--;
+ lock->nRequested--;
+ lock->requested[lockmode]--;
+ lock->nGranted--;
+ lock->granted[lockmode]--;
- if (!(lock->activeHolders[lockmode]))
+ if (lock->granted[lockmode] == 0)
{
/* change the conflict mask. No more of this lock type. */
- lock->mask &= BITS_OFF[lockmode];
+ lock->grantMask &= BITS_OFF[lockmode];
}
#ifdef NOT_USED
@@ -1139,14 +1139,14 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* with the remaining locks.
* --------------------------
*/
- if (lock->activeHolders[lockmode])
+ if (lock->granted[lockmode])
wakeupNeeded = false;
else
#endif
/*
* Above is not valid any more (due to MVCC lock modes). Actually
- * we should compare activeHolders[lockmode] with number of
+ * we should compare granted[lockmode] with number of
* waiters holding lock of this type and try to wakeup only if
* these numbers are equal (and lock released conflicts with locks
* requested by waiters). For the moment we only check the last
@@ -1156,11 +1156,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
wakeupNeeded = true;
LOCK_PRINT("LockRelease: updated", lock, lockmode);
- Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
- Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
- Assert(lock->nActive <= lock->nHolding);
+ Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+ Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
- if (!lock->nHolding)
+ if (!lock->nRequested)
{
/* ------------------
* if there's no one waiting in the queue,
@@ -1180,10 +1180,10 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
/*
* Now fix the per-holder lock stats.
*/
- holder->holders[lockmode]--;
+ holder->holding[lockmode]--;
holder->nHolding--;
HOLDER_PRINT("LockRelease: updated", holder);
- Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
+ Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
/*
* If this was my last hold on this lock, delete my entry in the holder
@@ -1236,8 +1236,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
{
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
- SHM_QUEUE *lockQueue = &(proc->lockQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+ SHM_QUEUE *holderQueue = &(proc->holderQueue);
+ SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int i,
@@ -1260,7 +1260,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
return FALSE;
}
- if (SHMQueueEmpty(lockQueue))
+ if (SHMQueueEmpty(holderQueue))
return TRUE;
numLockModes = lockMethodTable->ctl->numLockModes;
@@ -1268,7 +1268,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
SpinAcquire(masterLock);
- SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
+ SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
nleft = 0;
@@ -1308,53 +1308,55 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
HOLDER_PRINT("LockReleaseAll", holder);
LOCK_PRINT("LockReleaseAll", lock, 0);
- Assert(lock->nHolding > 0);
- Assert(lock->nActive > 0);
- Assert(lock->nActive <= lock->nHolding);
+ Assert(lock->nRequested >= 0);
+ Assert(lock->nGranted >= 0);
+ Assert(lock->nGranted <= lock->nRequested);
Assert(holder->nHolding >= 0);
- Assert(holder->nHolding <= lock->nHolding);
+ Assert(holder->nHolding <= lock->nRequested);
/* ------------------
* fix the general lock stats
* ------------------
*/
- if (lock->nHolding != holder->nHolding)
+ if (lock->nRequested != holder->nHolding)
{
for (i = 1; i <= numLockModes; i++)
{
- Assert(holder->holders[i] >= 0);
- lock->holders[i] -= holder->holders[i];
- lock->activeHolders[i] -= holder->holders[i];
- Assert((lock->holders[i] >= 0) \
- &&(lock->activeHolders[i] >= 0));
- if (!lock->activeHolders[i])
- lock->mask &= BITS_OFF[i];
-
- /*
- * Read comments in LockRelease
- */
- if (!wakeupNeeded && holder->holders[i] > 0 &&
- lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
- wakeupNeeded = true;
+ Assert(holder->holding[i] >= 0);
+ if (holder->holding[i] > 0)
+ {
+ lock->requested[i] -= holder->holding[i];
+ lock->granted[i] -= holder->holding[i];
+ Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
+ if (lock->granted[i] == 0)
+ lock->grantMask &= BITS_OFF[i];
+ /*
+ * Read comments in LockRelease
+ */
+ if (!wakeupNeeded &&
+ lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+ wakeupNeeded = true;
+ }
}
- lock->nHolding -= holder->nHolding;
- lock->nActive -= holder->nHolding;
- Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
- Assert(lock->nActive <= lock->nHolding);
+ lock->nRequested -= holder->nHolding;
+ lock->nGranted -= holder->nHolding;
+ Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
}
else
{
/* --------------
- * set nHolding to zero so that we can garbage collect the lock
+ * set nRequested to zero so that we can garbage collect the lock
* down below...
* --------------
*/
- lock->nHolding = 0;
+ lock->nRequested = 0;
+ lock->nGranted = 0;
/* Fix the lock status, just for next LOCK_PRINT message. */
for (i = 1; i <= numLockModes; i++)
{
- Assert(lock->holders[i] == lock->activeHolders[i]);
- lock->holders[i] = lock->activeHolders[i] = 0;
+ Assert(lock->requested[i] == lock->granted[i]);
+ lock->requested[i] = lock->granted[i] = 0;
}
}
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
@@ -1380,11 +1382,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
return FALSE;
}
- if (!lock->nHolding)
+ if (!lock->nRequested)
{
/* --------------------
- * if there's no one waiting in the queue, we've just released
- * the last lock.
+ * We've just released the last lock, so garbage-collect the
+ * lock object.
* --------------------
*/
LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
@@ -1413,12 +1415,13 @@ next_item:
{
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
- elog(DEBUG, "LockReleaseAll: reinitializing lockQueue");
+ elog(DEBUG, "LockReleaseAll: reinitializing holderQueue");
#endif
- SHMQueueInit(lockQueue);
+ SHMQueueInit(holderQueue);
}
SpinRelease(masterLock);
+
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: done");
@@ -1457,16 +1460,17 @@ LockShmemSize(int maxBackends)
}
/*
- * DeadlockCheck -- Checks for deadlocks for a given process
- *
- * We can't block on user locks, so no sense testing for deadlock
- * because there is no blocking, and no timer for the block.
+ * DeadLockCheck -- Checks for deadlocks for a given process
*
* This code takes a list of locks a process holds, and the lock that
* the process is sleeping on, and tries to find if any of the processes
* waiting on its locks hold the lock it is waiting for. If no deadlock
* is found, it goes on to look at all the processes waiting on their locks.
*
+ * We can't block on user locks, so no sense testing for deadlock
+ * because there is no blocking, and no timer for the block. So,
+ * only look at regular locks.
+ *
* We have already locked the master lock before being called.
*/
bool
@@ -1476,8 +1480,8 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
HOLDER *nextHolder = NULL;
PROC *waitProc;
PROC_QUEUE *waitQueue;
- SHM_QUEUE *lockQueue = &(thisProc->lockQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+ SHM_QUEUE *holderQueue = &(thisProc->holderQueue);
+ SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
LOCK *lock;
int i,
@@ -1494,10 +1498,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
nprocs = 1;
}
- if (SHMQueueEmpty(lockQueue))
+ /*
+ * Scan over all the locks held/awaited by thisProc.
+ */
+ if (SHMQueueEmpty(holderQueue))
return false;
- SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
+ SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do
{
@@ -1525,7 +1532,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
LOCK_PRINT("DeadLockCheck", lock, 0);
/*
- * waitLock is always in lockQueue of waiting proc, if !first_run
+ * waitLock is always in holderQueue of waiting proc, if !first_run
* then upper caller will handle waitProcs queue of waitLock.
*/
if (thisProc->waitLock == lock && !first_run)
@@ -1542,13 +1549,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
Assert(holder->nHolding > 0);
for (lm = 1; lm <= lockctl->numLockModes; lm++)
{
- if (holder->holders[lm] > 0 &&
+ if (holder->holding[lm] > 0 &&
lockctl->conflictTab[lm] & findlock->waitMask)
return true;
}
/*
- * Else - get the next lock from thisProc's lockQueue
+ * Else - get the next lock from thisProc's holderQueue
*/
goto nxtl;
}
@@ -1557,6 +1564,8 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/*
+ * Inner loop scans over all processes waiting for this lock.
+ *
* NOTE: loop must count down because we want to examine each item
* in the queue even if waitQueue->size decreases due to waking up
* some of the processes.
@@ -1573,14 +1582,14 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
if (lock == findlock) /* first_run also true */
{
/*
- * If me blocked by his holdlock...
+ * If I'm blocked by his heldLocks...
*/
- if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->holdLock)
+ if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
{
/* and he blocked by me -> deadlock */
- if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock)
+ if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
return true;
- /* we shouldn't look at lockQueue of our blockers */
+ /* we shouldn't look at holderQueue of our blockers */
goto nextWaitProc;
}
@@ -1591,11 +1600,11 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
* implicitly). Note that we don't do like test if
* !first_run (when thisProc is holder and non-waiter on
* lock) and so we call DeadLockCheck below for every
- * waitProc in thisProc->lockQueue, even for waitProc-s
+ * waitProc in thisProc->holderQueue, even for waitProc-s
* un-blocked by thisProc. Should we? This could save us
* some time...
*/
- if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) &&
+ if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
!(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
goto nextWaitProc;
}
@@ -1609,13 +1618,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
goto nextWaitProc;
}
- /* Recursively check this process's lockQueue. */
+ /* Recursively check this process's holderQueue. */
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc;
if (DeadLockCheck(waitProc, findlock))
{
- int holdLock;
+ int heldLocks;
/*
* Ok, but is waitProc waiting for me (thisProc) ?
@@ -1623,15 +1632,15 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
if (thisProc->waitLock == lock)
{
Assert(first_run);
- holdLock = thisProc->holdLock;
+ heldLocks = thisProc->heldLocks;
}
else
{
- /* should we cache holdLock to speed this up? */
- holdLock = LockGetMyHoldLocks(holder->tag.lock, thisProc);
- Assert(holdLock != 0);
+ /* should we cache heldLocks to speed this up? */
+ heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
+ Assert(heldLocks != 0);
}
- if (lockctl->conflictTab[waitProc->waitLockMode] & holdLock)
+ if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
{
/*
* Last attempt to avoid deadlock: try to wakeup myself.
@@ -1703,7 +1712,7 @@ nxtl:
#ifdef LOCK_DEBUG
/*
- * Dump all locks in the proc->lockQueue. Must have already acquired
+ * Dump all locks in the proc->holderQueue. Must have already acquired
* the masterLock.
*/
void
@@ -1711,7 +1720,7 @@ DumpLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
- SHM_QUEUE *lockQueue;
+ SHM_QUEUE *holderQueue;
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHMEM_OFFSET end;
@@ -1725,8 +1734,8 @@ DumpLocks(void)
proc = (PROC *) MAKE_PTR(location);
if (proc != MyProc)
return;
- lockQueue = &proc->lockQueue;
- end = MAKE_OFFSET(lockQueue);
+ holderQueue = &proc->holderQueue;
+ end = MAKE_OFFSET(holderQueue);
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
@@ -1736,10 +1745,10 @@ DumpLocks(void)
if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
- if (SHMQueueEmpty(lockQueue))
+ if (SHMQueueEmpty(holderQueue))
return;
- SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
+ SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do
{