diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
| -rw-r--r-- | src/backend/storage/lmgr/lock.c | 276 |
1 files changed, 125 insertions, 151 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 6f02c0720e..08e579486e 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.154 2005/05/29 22:45:02 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.155 2005/06/14 22:15:32 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -144,11 +144,10 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) { if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock))) elog(LOG, - "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)", + "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) hold(%x)", where, MAKE_OFFSET(proclockP), proclockP->tag.lock, PROCLOCK_LOCKMETHOD(*(proclockP)), - proclockP->tag.proc, proclockP->tag.xid, - (int) proclockP->holdMask); + proclockP->tag.proc, (int) proclockP->holdMask); } #else /* not LOCK_DEBUG */ @@ -162,8 +161,6 @@ static void RemoveLocalLock(LOCALLOCK *locallock); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, ResourceOwner owner); -static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, - int *myHolding); static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, PROCLOCK *proclock, LockMethod lockMethodTable); static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock, @@ -377,6 +374,15 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) * LockAcquire -- Check for lock conflicts, sleep if conflict found, * set lock if/when no conflicts. * + * Inputs: + * lockmethodid: identifies which lock table to use + * locktag: unique identifier for the lockable object + * isTempObject: is the lockable object a temporary object? (Under 2PC, + * such locks cannot be persisted) + * lockmode: lock mode to acquire + * sessionLock: if true, acquire lock for session not current transaction + * dontWait: if true, don't wait to acquire lock + * * Returns one of: * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true * LOCKACQUIRE_OK lock successfully acquired @@ -420,8 +426,12 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) * DZ - 22 Nov 1997 */ LockAcquireResult -LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, - TransactionId xid, LOCKMODE lockmode, bool dontWait) +LockAcquire(LOCKMETHODID lockmethodid, + LOCKTAG *locktag, + bool isTempObject, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait) { LOCALLOCKTAG localtag; LOCALLOCK *locallock; @@ -433,8 +443,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, LWLockId masterLock; LockMethod lockMethodTable; int status; - int myHolding[MAX_LOCKMODES]; - int i; #ifdef LOCK_DEBUG if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD) @@ -452,8 +460,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, elog(ERROR, "unrecognized lock method: %d", lockmethodid); /* Session locks and user locks are not transactional */ - if (xid != InvalidTransactionId && - lockmethodid == DEFAULT_LOCKMETHOD) + if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD) owner = CurrentResourceOwner; else owner = NULL; @@ -463,7 +470,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; - localtag.xid = xid; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], @@ -477,6 +483,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, { locallock->lock = NULL; locallock->proclock = NULL; + locallock->isTempObject = isTempObject; locallock->nLocks = 0; locallock->numLockOwners = 0; locallock->maxLockOwners = 8; @@ -487,6 +494,8 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, } else { + Assert(locallock->isTempObject == isTempObject); + /* Make sure there will be room to remember the lock */ if (locallock->numLockOwners >= locallock->maxLockOwners) { @@ -563,10 +572,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Create the hash key for the proclock table. */ - MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ proclocktag.lock = MAKE_OFFSET(lock); proclocktag.proc = MAKE_OFFSET(MyProc); - TransactionIdStore(xid, &proclocktag.xid); /* * Find or create a proclock entry with this tag @@ -605,6 +613,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, if (!found) { proclock->holdMask = 0; + proclock->releaseMask = 0; /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink); @@ -632,17 +641,22 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, * XXX Doing numeric comparison on the lockmodes is a hack; it'd be * better to use a table. For now, though, this works. */ - for (i = lockMethodTable->numLockModes; i > 0; i--) { - if (proclock->holdMask & LOCKBIT_ON(i)) + int i; + + for (i = lockMethodTable->numLockModes; i > 0; i--) { - if (i >= (int) lockmode) - break; /* safe: we have a lock >= req level */ - elog(LOG, "deadlock risk: raising lock level" - " from %s to %s on object %u/%u/%u", - lock_mode_names[i], lock_mode_names[lockmode], - lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); - break; + if (proclock->holdMask & LOCKBIT_ON(i)) + { + if (i >= (int) lockmode) + break; /* safe: we have a lock >= req level */ + elog(LOG, "deadlock risk: raising lock level" + " from %s to %s on object %u/%u/%u", + lock_mode_names[i], lock_mode_names[lockmode], + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3); + break; + } } } #endif /* CHECK_DEADLOCK_RISK */ @@ -658,18 +672,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* - * If this process (under any XID) is a holder of the lock, just grant - * myself another one without blocking. + * We shouldn't already hold the desired lock; else locallock table + * is broken. */ - LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); - if (myHolding[lockmode] > 0) - { - GrantLock(lock, proclock, lockmode); - GrantLockLocal(locallock, owner); - PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock); - LWLockRelease(masterLock); - return LOCKACQUIRE_ALREADY_HELD; - } + if (proclock->holdMask & LOCKBIT_ON(lockmode)) + elog(ERROR, "lock %s on object %u/%u/%u is already held", + lock_mode_names[lockmode], + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3); /* * If lock requested conflicts with locks requested by waiters, must @@ -680,8 +690,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, status = STATUS_FOUND; else status = LockCheckConflicts(lockMethodTable, lockmode, - lock, proclock, - MyProc, myHolding); + lock, proclock, MyProc); if (status == STATUS_OK) { @@ -723,18 +732,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, } /* - * Construct bitmask of locks this process holds on this object. + * Set bitmask of locks this process already holds on this object. */ - { - LOCKMASK heldLocks = 0; - - for (i = 1; i <= lockMethodTable->numLockModes; i++) - { - if (myHolding[i] > 0) - heldLocks |= LOCKBIT_ON(i); - } - MyProc->heldLocks = heldLocks; - } + MyProc->heldLocks = proclock->holdMask; /* * Sleep till someone wakes me up. @@ -793,26 +793,22 @@ RemoveLocalLock(LOCALLOCK *locallock) * * NOTES: * Here's what makes this complicated: one process's locks don't - * conflict with one another, even if they are held under different - * transaction IDs (eg, session and xact locks do not conflict). + * conflict with one another, no matter what purpose they are held for + * (eg, session and transaction locks do not conflict). * So, we must subtract off our own locks when determining whether the * requested new lock conflicts with those already held. - * - * The caller can optionally pass the process's total holding counts, if - * known. If NULL is passed then these values will be computed internally. */ int LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock, - PGPROC *proc, - int *myHolding) /* myHolding[] array or NULL */ + PGPROC *proc) { int numLockModes = lockMethodTable->numLockModes; - LOCKMASK bitmask; + LOCKMASK myLocks; + LOCKMASK otherLocks; int i; - int localHolding[MAX_LOCKMODES]; /* * first check for global conflicts: If no locks conflict with my @@ -830,32 +826,26 @@ LockCheckConflicts(LockMethod lockMethodTable, } /* - * Rats. Something conflicts. But it could still be my own lock. We - * have to construct a conflict mask that does not reflect our own - * locks. Locks held by the current process under another XID also - * count as "our own locks". + * Rats. Something conflicts. But it could still be my own lock. + * We have to construct a conflict mask that does not reflect our own + * locks, but only lock types held by other processes. */ - if (myHolding == NULL) - { - /* Caller didn't do calculation of total holding for me */ - LockCountMyLocks(proclock->tag.lock, proc, localHolding); - myHolding = localHolding; - } - - /* Compute mask of lock types held by other processes */ - bitmask = 0; + myLocks = proclock->holdMask; + otherLocks = 0; for (i = 1; i <= numLockModes; i++) { - if (lock->granted[i] != myHolding[i]) - bitmask |= LOCKBIT_ON(i); + int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0; + + if (lock->granted[i] > myHolding) + otherLocks |= LOCKBIT_ON(i); } /* - * now check again for conflicts. 'bitmask' describes the types of + * now check again for conflicts. 'otherLocks' describes the types of * locks held by other processes. If one of these conflicts with the * kind of lock that I want, there is a conflict and I have to sleep. */ - if (!(lockMethodTable->conflictTab[lockmode] & bitmask)) + if (!(lockMethodTable->conflictTab[lockmode] & otherLocks)) { /* no conflict. OK to get the lock */ PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); @@ -867,46 +857,6 @@ LockCheckConflicts(LockMethod lockMethodTable, } /* - * LockCountMyLocks --- Count total number of locks held on a given lockable - * object by a given process (under any transaction ID). - * - * XXX This could be rather slow if the process holds a large number of locks. - * Perhaps it could be sped up if we kept yet a third hashtable of per- - * process lock information. However, for the normal case where a transaction - * doesn't hold a large number of locks, keeping such a table would probably - * be a net slowdown. - */ -static void -LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding) -{ - SHM_QUEUE *procLocks = &(proc->procLocks); - PROCLOCK *proclock; - - MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int)); - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - - while (proclock) - { - if (lockOffset == proclock->tag.lock) - { - LOCKMASK holdMask = proclock->holdMask; - int i; - - for (i = 1; i < MAX_LOCKMODES; i++) - { - if (holdMask & LOCKBIT_ON(i)) - myHolding[i]++; - } - } - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, - offsetof(PROCLOCK, procLink)); - } -} - -/* * GrantLock -- update the lock and proclock data structures to show * the lock request has been granted. * @@ -1086,7 +1036,7 @@ GrantAwaitedLock(void) * WaitOnLock -- wait to acquire a lock * * Caller must have set MyProc->heldLocks to reflect locks already held - * on the lockable object by this process (under all XIDs). + * on the lockable object by this process. * * The locktable's masterLock must be held at entry. */ @@ -1212,7 +1162,8 @@ RemoveFromWaitQueue(PGPROC *proc) /* * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and - * release one 'lockmode' lock on it. + * release one 'lockmode' lock on it. Release a session lock if + * 'sessionLock' is true, else release a regular transaction lock. * * Side Effects: find any waiting processes that are now wakable, * grant them their requested locks and awaken them. @@ -1222,7 +1173,7 @@ RemoveFromWaitQueue(PGPROC *proc) */ bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, - TransactionId xid, LOCKMODE lockmode) + LOCKMODE lockmode, bool sessionLock) { LOCALLOCKTAG localtag; LOCALLOCK *locallock; @@ -1252,7 +1203,6 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; - localtag.xid = xid; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], @@ -1279,8 +1229,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, int i; /* Session locks and user locks are not transactional */ - if (xid != InvalidTransactionId && - lockmethodid == DEFAULT_LOCKMETHOD) + if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD) owner = CurrentResourceOwner; else owner = NULL; @@ -1367,15 +1316,11 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, * are held by the current process. * * Well, not necessarily *all* locks. The available behaviors are: - * - * allxids == true: release all locks regardless of transaction - * affiliation. - * - * allxids == false: release all locks with Xid != 0 - * (zero is the Xid used for "session" locks). + * allLocks == true: release all locks including session locks. + * allLocks == false: release all non-session locks. */ void -LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) +LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) { HASH_SEQ_STATUS status; SHM_QUEUE *procLocks = &(MyProc->procLocks); @@ -1427,13 +1372,41 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) continue; /* - * Ignore locks with Xid=0 unless we are asked to release all - * locks + * If we are asked to release all locks, we can just zap the + * entry. Otherwise, must scan to see if there are session locks. + * We assume there is at most one lockOwners entry for session locks. */ - if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId) - && !allxids) - continue; + if (!allLocks) + { + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + + /* If it's above array position 0, move it down to 0 */ + for (i = locallock->numLockOwners - 1; i > 0; i--) + { + if (lockOwners[i].owner == NULL) + { + lockOwners[0] = lockOwners[i]; + break; + } + } + + if (locallock->numLockOwners > 0 && + lockOwners[0].owner == NULL && + lockOwners[0].nLocks > 0) + { + /* Fix the locallock to show just the session locks */ + locallock->nLocks = lockOwners[0].nLocks; + locallock->numLockOwners = 1; + /* We aren't deleting this locallock, so done */ + continue; + } + } + + /* Mark the proclock to show we need to release this lockmode */ + if (locallock->nLocks > 0) + locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); + /* And remove the locallock hashtable entry */ RemoveLocalLock(locallock); } @@ -1460,11 +1433,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) goto next_item; /* - * Ignore locks with Xid=0 unless we are asked to release all - * locks + * In allLocks mode, force release of all locks even if locallock + * table had problems */ - if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId) - && !allxids) + if (allLocks) + proclock->releaseMask = proclock->holdMask; + else + Assert((proclock->releaseMask & ~proclock->holdMask) == 0); + + /* + * Ignore items that have nothing to be released, unless they have + * holdMask == 0 and are therefore recyclable + */ + if (proclock->releaseMask == 0 && proclock->holdMask != 0) goto next_item; PROCLOCK_PRINT("LockReleaseAll", proclock); @@ -1475,22 +1456,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) Assert((proclock->holdMask & ~lock->grantMask) == 0); /* - * fix the general lock stats + * Release the previously-marked lock modes */ - if (proclock->holdMask) + for (i = 1; i <= numLockModes; i++) { - for (i = 1; i <= numLockModes; i++) - { - if (proclock->holdMask & LOCKBIT_ON(i)) - wakeupNeeded |= UnGrantLock(lock, i, proclock, - lockMethodTable); - } + if (proclock->releaseMask & LOCKBIT_ON(i)) + wakeupNeeded |= UnGrantLock(lock, i, proclock, + lockMethodTable); } Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); Assert(lock->nGranted <= lock->nRequested); LOCK_PRINT("LockReleaseAll: updated", lock, 0); - Assert(proclock->holdMask == 0); + proclock->releaseMask = 0; /* CleanUpLock will wake up waiters if needed. */ CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded); @@ -1528,8 +1506,6 @@ LockReleaseCurrentOwner(void) /* Ignore items that must be nontransactional */ if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) continue; - if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) - continue; /* Scan to see if there are any locks belonging to current owner */ lockOwners = locallock->lockOwners; @@ -1558,8 +1534,8 @@ LockReleaseCurrentOwner(void) locallock->nLocks = 1; if (!LockRelease(DEFAULT_LOCKMETHOD, &locallock->tag.lock, - locallock->tag.xid, - locallock->tag.mode)) + locallock->tag.mode, + false)) elog(WARNING, "LockReleaseCurrentOwner: failed??"); } break; @@ -1594,8 +1570,6 @@ LockReassignCurrentOwner(void) /* Ignore items that must be nontransactional */ if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) continue; - if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) - continue; /* * Scan to see if there are any locks belonging to current owner |
