diff options
Diffstat (limited to 'src/backend/storage/ipc')
| -rw-r--r-- | src/backend/storage/ipc/procarray.c | 189 | ||||
| -rw-r--r-- | src/backend/storage/ipc/sinvaladt.c | 54 |
2 files changed, 188 insertions, 55 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 51da9679f3..577f73a31f 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -23,7 +23,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.28 2007/07/01 02:22:23 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.29 2007/09/05 18:10:47 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -404,7 +404,7 @@ TransactionIdIsActive(TransactionId xid) * This is also used to determine where to truncate pg_subtrans. allDbs * must be TRUE for that case, and ignoreVacuum FALSE. * - * Note: we include the currently running xids in the set of considered xids. + * Note: we include all currently running xids in the set of considered xids. * This ensures that if a just-started xact has not yet set its snapshot, * when it does set the snapshot it cannot set xmin less than what we compute. */ @@ -416,15 +416,19 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) int index; /* - * Normally we start the min() calculation with our own XID. But if - * called by checkpointer, we will not be inside a transaction, so use - * next XID as starting point for min() calculation. (Note that if there - * are no xacts running at all, that will be the subtrans truncation - * point!) + * We need to initialize the MIN() calculation with something. + * ReadNewTransactionId() is guaranteed to work, but is relatively + * expensive due to locking; so first we try a couple of shortcuts. + * If we have a valid xmin in our own PGPROC entry, that will do; + * or if we have assigned ourselves an XID, that will do. */ - result = GetTopTransactionId(); + result = MyProc ? MyProc->xmin : InvalidTransactionId; if (!TransactionIdIsValid(result)) - result = ReadNewTransactionId(); + { + result = GetTopTransactionIdIfAny(); + if (!TransactionIdIsValid(result)) + result = ReadNewTransactionId(); + } LWLockAcquire(ProcArrayLock, LW_SHARED); @@ -440,23 +444,22 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = proc->xid; - if (TransactionIdIsNormal(xid)) - { - /* First consider the transaction own's Xid */ - if (TransactionIdPrecedes(xid, result)) - result = xid; - - /* - * Also consider the transaction's Xmin, if set. - * - * We must check both Xid and Xmin because there is a window - * where an xact's Xid is set but Xmin isn't yet. - */ - xid = proc->xmin; - if (TransactionIdIsNormal(xid)) - if (TransactionIdPrecedes(xid, result)) - result = xid; - } + /* First consider the transaction's own Xid, if any */ + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, result)) + result = xid; + + /* + * Also consider the transaction's Xmin, if set. + * + * We must check both Xid and Xmin because a transaction might + * have an Xmin but not (yet) an Xid; conversely, if it has + * an Xid, that could determine some not-yet-set Xmin. + */ + xid = proc->xmin; /* Fetch just once */ + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, result)) + result = xid; } } @@ -545,8 +548,6 @@ GetSnapshotData(Snapshot snapshot, bool serializable) errmsg("out of memory"))); } - globalxmin = xmin = GetTopTransactionId(); - /* * It is sufficient to get shared lock on ProcArrayLock, even if we are * computing a serializable snapshot and therefore will be setting @@ -557,6 +558,19 @@ GetSnapshotData(Snapshot snapshot, bool serializable) * discussion just below). So it doesn't matter whether another backend * concurrently doing GetSnapshotData or GetOldestXmin sees our xmin as * set or not; he'd compute the same xmin for himself either way. + * (We are assuming here that xmin can be set and read atomically, + * just like xid.) + * + * There is a corner case in which the above argument doesn't work: if + * there isn't any oldest xact, ie, all xids in the array are invalid. + * In that case we will compute xmin as the result of ReadNewTransactionId, + * and since GetNewTransactionId doesn't take the ProcArrayLock, it's not + * so obvious that two backends with overlapping shared locks will get + * the same answer. But GetNewTransactionId is required to store the XID + * it assigned into the ProcArray before releasing XidGenLock. Therefore + * the backend that did ReadNewTransactionId later will see that XID in + * the array, and will compute the same xmin as the earlier one that saw + * no XIDs in the array. */ LWLockAcquire(ProcArrayLock, LW_SHARED); @@ -589,6 +603,9 @@ GetSnapshotData(Snapshot snapshot, bool serializable) xmax = ReadNewTransactionId(); + /* initialize xmin calculation with xmax */ + globalxmin = xmin = xmax; + /* * Spin over procArray checking xid, xmin, and subxids. The goal is * to gather all active xids, find the lowest xmin, and try to record @@ -597,34 +614,40 @@ GetSnapshotData(Snapshot snapshot, bool serializable) for (index = 0; index < arrayP->numProcs; index++) { PGPROC *proc = arrayP->procs[index]; + TransactionId xid; + + /* Ignore procs running LAZY VACUUM */ + if (proc->inVacuum) + continue; + + /* Update globalxmin to be the smallest valid xmin */ + xid = proc->xmin; /* fetch just once */ + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, globalxmin)) + globalxmin = xid; /* Fetch xid just once - see GetNewTransactionId */ - TransactionId xid = proc->xid; + xid = proc->xid; /* - * Ignore my own proc (dealt with my xid above), procs not running a - * transaction, xacts started since we read the next transaction ID, - * and xacts executing LAZY VACUUM. There's no need to store XIDs - * above what we got from ReadNewTransactionId, since we'll treat them - * as running anyway. We also assume that such xacts can't compute an - * xmin older than ours, so they needn't be considered in computing - * globalxmin. + * If the transaction has been assigned an xid < xmax we add it to the + * snapshot, and update xmin if necessary. There's no need to store + * XIDs above what we got from ReadNewTransactionId, since we'll treat + * them as running anyway. We don't bother to examine their subxids + * either. + * + * We don't include our own XID (if any) in the snapshot, but we must + * include it into xmin. */ - if (proc == MyProc || - !TransactionIdIsNormal(xid) || - TransactionIdFollowsOrEquals(xid, xmax) || - proc->inVacuum) - continue; - - if (TransactionIdPrecedes(xid, xmin)) - xmin = xid; - snapshot->xip[count++] = xid; - - /* Update globalxmin to be the smallest valid xmin */ - xid = proc->xmin; if (TransactionIdIsNormal(xid)) - if (TransactionIdPrecedes(xid, globalxmin)) - globalxmin = xid; + { + if (TransactionIdFollowsOrEquals(xid, xmax)) + continue; + if (proc != MyProc) + snapshot->xip[count++] = xid; + if (TransactionIdPrecedes(xid, xmin)) + xmin = xid; + } /* * Save subtransaction XIDs if possible (if we've already overflowed, @@ -635,8 +658,10 @@ GetSnapshotData(Snapshot snapshot, bool serializable) * remove any. Hence it's important to fetch nxids just once. Should * be safe to use memcpy, though. (We needn't worry about missing any * xids added concurrently, because they must postdate xmax.) + * + * Again, our own XIDs are not included in the snapshot. */ - if (subcount >= 0) + if (subcount >= 0 && proc != MyProc) { if (proc->subxids.overflowed) subcount = -1; /* overflowed */ @@ -818,6 +843,9 @@ BackendPidGetProc(int pid) * * Only main transaction Ids are considered. This function is mainly * useful for determining what backend owns a lock. + * + * Beware that not every xact has an XID assigned. However, as long as you + * only call this using an XID found on disk, you're safe. */ int BackendXidGetPid(TransactionId xid) @@ -856,6 +884,63 @@ IsBackendPid(int pid) return (BackendPidGetProc(pid) != NULL); } + +/* + * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs. + * + * The array is palloc'd and is terminated with an invalid VXID. + * + * If limitXmin is not InvalidTransactionId, we skip any backends + * with xmin >= limitXmin. Also, our own process is always skipped. + */ +VirtualTransactionId * +GetCurrentVirtualXIDs(TransactionId limitXmin) +{ + VirtualTransactionId *vxids; + ProcArrayStruct *arrayP = procArray; + int count = 0; + int index; + + /* allocate result space with room for a terminator */ + vxids = (VirtualTransactionId *) + palloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1)); + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + PGPROC *proc = arrayP->procs[index]; + /* Fetch xmin just once - might change on us? */ + TransactionId pxmin = proc->xmin; + + if (proc == MyProc) + continue; + + /* + * Note that InvalidTransactionId precedes all other XIDs, so a + * proc that hasn't set xmin yet will always be included. + */ + if (!TransactionIdIsValid(limitXmin) || + TransactionIdPrecedes(pxmin, limitXmin)) + { + VirtualTransactionId vxid; + + GET_VXID_FROM_PGPROC(vxid, *proc); + if (VirtualTransactionIdIsValid(vxid)) + vxids[count++] = vxid; + } + } + + LWLockRelease(ProcArrayLock); + + /* add the terminator */ + vxids[count].backendId = InvalidBackendId; + vxids[count].localTransactionId = InvalidLocalTransactionId; + + return vxids; +} + + /* * CountActiveBackends --- count backends (other than myself) that are in * active transactions. This is used as a heuristic to decide if @@ -885,7 +970,7 @@ CountActiveBackends(void) if (proc->pid == 0) continue; /* do not count prepared xacts */ if (proc->xid == InvalidTransactionId) - continue; /* do not count if not in a transaction */ + continue; /* do not count if no XID assigned */ if (proc->waitLock != NULL) continue; /* do not count if blocked on a lock */ count++; diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 31c4a2dfad..99690d8b36 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.63 2007/01/05 22:19:38 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.64 2007/09/05 18:10:47 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -19,12 +19,15 @@ #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/pmsignal.h" +#include "storage/proc.h" #include "storage/shmem.h" #include "storage/sinvaladt.h" SISeg *shmInvalBuffer; +static LocalTransactionId nextLocalTransactionId; + static void CleanupInvalidationState(int status, Datum arg); static void SISetProcStateInvalid(SISeg *segP); @@ -40,6 +43,8 @@ SInvalShmemSize(void) size = offsetof(SISeg, procState); size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); + size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends)); + return size; } @@ -51,15 +56,21 @@ void SIBufferInit(void) { SISeg *segP; + Size size; int i; bool found; /* Allocate space in shared memory */ + size = offsetof(SISeg, procState); + size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); + shmInvalBuffer = segP = (SISeg *) - ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found); + ShmemInitStruct("shmInvalBuffer", size, &found); if (found) return; + segP->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends); + /* Clear message counters, save size of procState array */ segP->minMsgNum = 0; segP->maxMsgNum = 0; @@ -69,11 +80,12 @@ SIBufferInit(void) /* The buffer[] array is initially all unused, so we need not fill it */ - /* Mark all backends inactive */ + /* Mark all backends inactive, and initialize nextLXID */ for (i = 0; i < segP->maxBackends; i++) { segP->procState[i].nextMsgNum = -1; /* inactive */ segP->procState[i].resetState = false; + segP->nextLXID[i] = InvalidLocalTransactionId; } } @@ -128,9 +140,15 @@ SIBackendInit(SISeg *segP) elog(DEBUG2, "my backend id is %d", MyBackendId); #endif /* INVALIDDEBUG */ + /* Advertise assigned backend ID in MyProc */ + MyProc->backendId = MyBackendId; + /* Reduce free slot count */ segP->freeBackends--; + /* Fetch next local transaction ID into local memory */ + nextLocalTransactionId = segP->nextLXID[MyBackendId - 1]; + /* mark myself active, with all extant messages already read */ stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; @@ -160,6 +178,9 @@ CleanupInvalidationState(int status, Datum arg) LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + /* Update next local transaction ID for next holder of this backendID */ + segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId; + /* Mark myself inactive */ segP->procState[MyBackendId - 1].nextMsgNum = -1; segP->procState[MyBackendId - 1].resetState = false; @@ -352,3 +373,30 @@ SIDelExpiredDataEntries(SISeg *segP) } } } + + +/* + * GetNextLocalTransactionId --- allocate a new LocalTransactionId + * + * We split VirtualTransactionIds into two parts so that it is possible + * to allocate a new one without any contention for shared memory, except + * for a bit of additional overhead during backend startup/shutdown. + * The high-order part of a VirtualTransactionId is a BackendId, and the + * low-order part is a LocalTransactionId, which we assign from a local + * counter. To avoid the risk of a VirtualTransactionId being reused + * within a short interval, successive procs occupying the same backend ID + * slot should use a consecutive sequence of local IDs, which is implemented + * by copying nextLocalTransactionId as seen above. + */ +LocalTransactionId +GetNextLocalTransactionId(void) +{ + LocalTransactionId result; + + /* loop to avoid returning InvalidLocalTransactionId at wraparound */ + do { + result = nextLocalTransactionId++; + } while (!LocalTransactionIdIsValid(result)); + + return result; +} |
