summaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r--src/backend/storage/lmgr/lock.c276
1 files changed, 125 insertions, 151 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 6f02c0720e..08e579486e 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.154 2005/05/29 22:45:02 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.155 2005/06/14 22:15:32 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -144,11 +144,10 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
{
if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
elog(LOG,
- "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
+ "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) hold(%x)",
where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
PROCLOCK_LOCKMETHOD(*(proclockP)),
- proclockP->tag.proc, proclockP->tag.xid,
- (int) proclockP->holdMask);
+ proclockP->tag.proc, (int) proclockP->holdMask);
}
#else /* not LOCK_DEBUG */
@@ -162,8 +161,6 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
ResourceOwner owner);
-static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
- int *myHolding);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
@@ -377,6 +374,15 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
*
+ * Inputs:
+ * lockmethodid: identifies which lock table to use
+ * locktag: unique identifier for the lockable object
+ * isTempObject: is the lockable object a temporary object? (Under 2PC,
+ * such locks cannot be persisted)
+ * lockmode: lock mode to acquire
+ * sessionLock: if true, acquire lock for session not current transaction
+ * dontWait: if true, don't wait to acquire lock
+ *
* Returns one of:
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
@@ -420,8 +426,12 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* DZ - 22 Nov 1997
*/
LockAcquireResult
-LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
- TransactionId xid, LOCKMODE lockmode, bool dontWait)
+LockAcquire(LOCKMETHODID lockmethodid,
+ LOCKTAG *locktag,
+ bool isTempObject,
+ LOCKMODE lockmode,
+ bool sessionLock,
+ bool dontWait)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
@@ -433,8 +443,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
LWLockId masterLock;
LockMethod lockMethodTable;
int status;
- int myHolding[MAX_LOCKMODES];
- int i;
#ifdef LOCK_DEBUG
if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
@@ -452,8 +460,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
/* Session locks and user locks are not transactional */
- if (xid != InvalidTransactionId &&
- lockmethodid == DEFAULT_LOCKMETHOD)
+ if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD)
owner = CurrentResourceOwner;
else
owner = NULL;
@@ -463,7 +470,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
- localtag.xid = xid;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
@@ -477,6 +483,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
{
locallock->lock = NULL;
locallock->proclock = NULL;
+ locallock->isTempObject = isTempObject;
locallock->nLocks = 0;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
@@ -487,6 +494,8 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
else
{
+ Assert(locallock->isTempObject == isTempObject);
+
/* Make sure there will be room to remember the lock */
if (locallock->numLockOwners >= locallock->maxLockOwners)
{
@@ -563,10 +572,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
/*
* Create the hash key for the proclock table.
*/
- MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(MyProc);
- TransactionIdStore(xid, &proclocktag.xid);
/*
* Find or create a proclock entry with this tag
@@ -605,6 +613,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
if (!found)
{
proclock->holdMask = 0;
+ proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
@@ -632,17 +641,22 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* XXX Doing numeric comparison on the lockmodes is a hack; it'd be
* better to use a table. For now, though, this works.
*/
- for (i = lockMethodTable->numLockModes; i > 0; i--)
{
- if (proclock->holdMask & LOCKBIT_ON(i))
+ int i;
+
+ for (i = lockMethodTable->numLockModes; i > 0; i--)
{
- if (i >= (int) lockmode)
- break; /* safe: we have a lock >= req level */
- elog(LOG, "deadlock risk: raising lock level"
- " from %s to %s on object %u/%u/%u",
- lock_mode_names[i], lock_mode_names[lockmode],
- lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
- break;
+ if (proclock->holdMask & LOCKBIT_ON(i))
+ {
+ if (i >= (int) lockmode)
+ break; /* safe: we have a lock >= req level */
+ elog(LOG, "deadlock risk: raising lock level"
+ " from %s to %s on object %u/%u/%u",
+ lock_mode_names[i], lock_mode_names[lockmode],
+ lock->tag.locktag_field1, lock->tag.locktag_field2,
+ lock->tag.locktag_field3);
+ break;
+ }
}
}
#endif /* CHECK_DEADLOCK_RISK */
@@ -658,18 +672,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/*
- * If this process (under any XID) is a holder of the lock, just grant
- * myself another one without blocking.
+ * We shouldn't already hold the desired lock; else locallock table
+ * is broken.
*/
- LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
- if (myHolding[lockmode] > 0)
- {
- GrantLock(lock, proclock, lockmode);
- GrantLockLocal(locallock, owner);
- PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
- LWLockRelease(masterLock);
- return LOCKACQUIRE_ALREADY_HELD;
- }
+ if (proclock->holdMask & LOCKBIT_ON(lockmode))
+ elog(ERROR, "lock %s on object %u/%u/%u is already held",
+ lock_mode_names[lockmode],
+ lock->tag.locktag_field1, lock->tag.locktag_field2,
+ lock->tag.locktag_field3);
/*
* If lock requested conflicts with locks requested by waiters, must
@@ -680,8 +690,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
status = STATUS_FOUND;
else
status = LockCheckConflicts(lockMethodTable, lockmode,
- lock, proclock,
- MyProc, myHolding);
+ lock, proclock, MyProc);
if (status == STATUS_OK)
{
@@ -723,18 +732,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
/*
- * Construct bitmask of locks this process holds on this object.
+ * Set bitmask of locks this process already holds on this object.
*/
- {
- LOCKMASK heldLocks = 0;
-
- for (i = 1; i <= lockMethodTable->numLockModes; i++)
- {
- if (myHolding[i] > 0)
- heldLocks |= LOCKBIT_ON(i);
- }
- MyProc->heldLocks = heldLocks;
- }
+ MyProc->heldLocks = proclock->holdMask;
/*
* Sleep till someone wakes me up.
@@ -793,26 +793,22 @@ RemoveLocalLock(LOCALLOCK *locallock)
*
* NOTES:
* Here's what makes this complicated: one process's locks don't
- * conflict with one another, even if they are held under different
- * transaction IDs (eg, session and xact locks do not conflict).
+ * conflict with one another, no matter what purpose they are held for
+ * (eg, session and transaction locks do not conflict).
* So, we must subtract off our own locks when determining whether the
* requested new lock conflicts with those already held.
- *
- * The caller can optionally pass the process's total holding counts, if
- * known. If NULL is passed then these values will be computed internally.
*/
int
LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock,
PROCLOCK *proclock,
- PGPROC *proc,
- int *myHolding) /* myHolding[] array or NULL */
+ PGPROC *proc)
{
int numLockModes = lockMethodTable->numLockModes;
- LOCKMASK bitmask;
+ LOCKMASK myLocks;
+ LOCKMASK otherLocks;
int i;
- int localHolding[MAX_LOCKMODES];
/*
* first check for global conflicts: If no locks conflict with my
@@ -830,32 +826,26 @@ LockCheckConflicts(LockMethod lockMethodTable,
}
/*
- * Rats. Something conflicts. But it could still be my own lock. We
- * have to construct a conflict mask that does not reflect our own
- * locks. Locks held by the current process under another XID also
- * count as "our own locks".
+ * Rats. Something conflicts. But it could still be my own lock.
+ * We have to construct a conflict mask that does not reflect our own
+ * locks, but only lock types held by other processes.
*/
- if (myHolding == NULL)
- {
- /* Caller didn't do calculation of total holding for me */
- LockCountMyLocks(proclock->tag.lock, proc, localHolding);
- myHolding = localHolding;
- }
-
- /* Compute mask of lock types held by other processes */
- bitmask = 0;
+ myLocks = proclock->holdMask;
+ otherLocks = 0;
for (i = 1; i <= numLockModes; i++)
{
- if (lock->granted[i] != myHolding[i])
- bitmask |= LOCKBIT_ON(i);
+ int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
+
+ if (lock->granted[i] > myHolding)
+ otherLocks |= LOCKBIT_ON(i);
}
/*
- * now check again for conflicts. 'bitmask' describes the types of
+ * now check again for conflicts. 'otherLocks' describes the types of
* locks held by other processes. If one of these conflicts with the
* kind of lock that I want, there is a conflict and I have to sleep.
*/
- if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
+ if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
{
/* no conflict. OK to get the lock */
PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
@@ -867,46 +857,6 @@ LockCheckConflicts(LockMethod lockMethodTable,
}
/*
- * LockCountMyLocks --- Count total number of locks held on a given lockable
- * object by a given process (under any transaction ID).
- *
- * XXX This could be rather slow if the process holds a large number of locks.
- * Perhaps it could be sped up if we kept yet a third hashtable of per-
- * process lock information. However, for the normal case where a transaction
- * doesn't hold a large number of locks, keeping such a table would probably
- * be a net slowdown.
- */
-static void
-LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
-{
- SHM_QUEUE *procLocks = &(proc->procLocks);
- PROCLOCK *proclock;
-
- MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
-
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
- offsetof(PROCLOCK, procLink));
-
- while (proclock)
- {
- if (lockOffset == proclock->tag.lock)
- {
- LOCKMASK holdMask = proclock->holdMask;
- int i;
-
- for (i = 1; i < MAX_LOCKMODES; i++)
- {
- if (holdMask & LOCKBIT_ON(i))
- myHolding[i]++;
- }
- }
-
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
- offsetof(PROCLOCK, procLink));
- }
-}
-
-/*
* GrantLock -- update the lock and proclock data structures to show
* the lock request has been granted.
*
@@ -1086,7 +1036,7 @@ GrantAwaitedLock(void)
* WaitOnLock -- wait to acquire a lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
- * on the lockable object by this process (under all XIDs).
+ * on the lockable object by this process.
*
* The locktable's masterLock must be held at entry.
*/
@@ -1212,7 +1162,8 @@ RemoveFromWaitQueue(PGPROC *proc)
/*
* LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
- * release one 'lockmode' lock on it.
+ * release one 'lockmode' lock on it. Release a session lock if
+ * 'sessionLock' is true, else release a regular transaction lock.
*
* Side Effects: find any waiting processes that are now wakable,
* grant them their requested locks and awaken them.
@@ -1222,7 +1173,7 @@ RemoveFromWaitQueue(PGPROC *proc)
*/
bool
LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
- TransactionId xid, LOCKMODE lockmode)
+ LOCKMODE lockmode, bool sessionLock)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
@@ -1252,7 +1203,6 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
- localtag.xid = xid;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
@@ -1279,8 +1229,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
int i;
/* Session locks and user locks are not transactional */
- if (xid != InvalidTransactionId &&
- lockmethodid == DEFAULT_LOCKMETHOD)
+ if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD)
owner = CurrentResourceOwner;
else
owner = NULL;
@@ -1367,15 +1316,11 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* are held by the current process.
*
* Well, not necessarily *all* locks. The available behaviors are:
- *
- * allxids == true: release all locks regardless of transaction
- * affiliation.
- *
- * allxids == false: release all locks with Xid != 0
- * (zero is the Xid used for "session" locks).
+ * allLocks == true: release all locks including session locks.
+ * allLocks == false: release all non-session locks.
*/
void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
+LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
{
HASH_SEQ_STATUS status;
SHM_QUEUE *procLocks = &(MyProc->procLocks);
@@ -1427,13 +1372,41 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
continue;
/*
- * Ignore locks with Xid=0 unless we are asked to release all
- * locks
+ * If we are asked to release all locks, we can just zap the
+ * entry. Otherwise, must scan to see if there are session locks.
+ * We assume there is at most one lockOwners entry for session locks.
*/
- if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
- && !allxids)
- continue;
+ if (!allLocks)
+ {
+ LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+
+ /* If it's above array position 0, move it down to 0 */
+ for (i = locallock->numLockOwners - 1; i > 0; i--)
+ {
+ if (lockOwners[i].owner == NULL)
+ {
+ lockOwners[0] = lockOwners[i];
+ break;
+ }
+ }
+
+ if (locallock->numLockOwners > 0 &&
+ lockOwners[0].owner == NULL &&
+ lockOwners[0].nLocks > 0)
+ {
+ /* Fix the locallock to show just the session locks */
+ locallock->nLocks = lockOwners[0].nLocks;
+ locallock->numLockOwners = 1;
+ /* We aren't deleting this locallock, so done */
+ continue;
+ }
+ }
+
+ /* Mark the proclock to show we need to release this lockmode */
+ if (locallock->nLocks > 0)
+ locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
+ /* And remove the locallock hashtable entry */
RemoveLocalLock(locallock);
}
@@ -1460,11 +1433,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
goto next_item;
/*
- * Ignore locks with Xid=0 unless we are asked to release all
- * locks
+ * In allLocks mode, force release of all locks even if locallock
+ * table had problems
*/
- if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
- && !allxids)
+ if (allLocks)
+ proclock->releaseMask = proclock->holdMask;
+ else
+ Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
+
+ /*
+ * Ignore items that have nothing to be released, unless they have
+ * holdMask == 0 and are therefore recyclable
+ */
+ if (proclock->releaseMask == 0 && proclock->holdMask != 0)
goto next_item;
PROCLOCK_PRINT("LockReleaseAll", proclock);
@@ -1475,22 +1456,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
Assert((proclock->holdMask & ~lock->grantMask) == 0);
/*
- * fix the general lock stats
+ * Release the previously-marked lock modes
*/
- if (proclock->holdMask)
+ for (i = 1; i <= numLockModes; i++)
{
- for (i = 1; i <= numLockModes; i++)
- {
- if (proclock->holdMask & LOCKBIT_ON(i))
- wakeupNeeded |= UnGrantLock(lock, i, proclock,
- lockMethodTable);
- }
+ if (proclock->releaseMask & LOCKBIT_ON(i))
+ wakeupNeeded |= UnGrantLock(lock, i, proclock,
+ lockMethodTable);
}
Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
Assert(lock->nGranted <= lock->nRequested);
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
- Assert(proclock->holdMask == 0);
+ proclock->releaseMask = 0;
/* CleanUpLock will wake up waiters if needed. */
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
@@ -1528,8 +1506,6 @@ LockReleaseCurrentOwner(void)
/* Ignore items that must be nontransactional */
if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
continue;
- if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
- continue;
/* Scan to see if there are any locks belonging to current owner */
lockOwners = locallock->lockOwners;
@@ -1558,8 +1534,8 @@ LockReleaseCurrentOwner(void)
locallock->nLocks = 1;
if (!LockRelease(DEFAULT_LOCKMETHOD,
&locallock->tag.lock,
- locallock->tag.xid,
- locallock->tag.mode))
+ locallock->tag.mode,
+ false))
elog(WARNING, "LockReleaseCurrentOwner: failed??");
}
break;
@@ -1594,8 +1570,6 @@ LockReassignCurrentOwner(void)
/* Ignore items that must be nontransactional */
if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
continue;
- if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
- continue;
/*
* Scan to see if there are any locks belonging to current owner