summaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r--src/backend/storage/ipc/procarray.c189
-rw-r--r--src/backend/storage/ipc/sinvaladt.c54
2 files changed, 188 insertions, 55 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 51da9679f3..577f73a31f 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -23,7 +23,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.28 2007/07/01 02:22:23 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.29 2007/09/05 18:10:47 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -404,7 +404,7 @@ TransactionIdIsActive(TransactionId xid)
* This is also used to determine where to truncate pg_subtrans. allDbs
* must be TRUE for that case, and ignoreVacuum FALSE.
*
- * Note: we include the currently running xids in the set of considered xids.
+ * Note: we include all currently running xids in the set of considered xids.
* This ensures that if a just-started xact has not yet set its snapshot,
* when it does set the snapshot it cannot set xmin less than what we compute.
*/
@@ -416,15 +416,19 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
int index;
/*
- * Normally we start the min() calculation with our own XID. But if
- * called by checkpointer, we will not be inside a transaction, so use
- * next XID as starting point for min() calculation. (Note that if there
- * are no xacts running at all, that will be the subtrans truncation
- * point!)
+ * We need to initialize the MIN() calculation with something.
+ * ReadNewTransactionId() is guaranteed to work, but is relatively
+ * expensive due to locking; so first we try a couple of shortcuts.
+ * If we have a valid xmin in our own PGPROC entry, that will do;
+ * or if we have assigned ourselves an XID, that will do.
*/
- result = GetTopTransactionId();
+ result = MyProc ? MyProc->xmin : InvalidTransactionId;
if (!TransactionIdIsValid(result))
- result = ReadNewTransactionId();
+ {
+ result = GetTopTransactionIdIfAny();
+ if (!TransactionIdIsValid(result))
+ result = ReadNewTransactionId();
+ }
LWLockAcquire(ProcArrayLock, LW_SHARED);
@@ -440,23 +444,22 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = proc->xid;
- if (TransactionIdIsNormal(xid))
- {
- /* First consider the transaction own's Xid */
- if (TransactionIdPrecedes(xid, result))
- result = xid;
-
- /*
- * Also consider the transaction's Xmin, if set.
- *
- * We must check both Xid and Xmin because there is a window
- * where an xact's Xid is set but Xmin isn't yet.
- */
- xid = proc->xmin;
- if (TransactionIdIsNormal(xid))
- if (TransactionIdPrecedes(xid, result))
- result = xid;
- }
+ /* First consider the transaction's own Xid, if any */
+ if (TransactionIdIsNormal(xid) &&
+ TransactionIdPrecedes(xid, result))
+ result = xid;
+
+ /*
+ * Also consider the transaction's Xmin, if set.
+ *
+ * We must check both Xid and Xmin because a transaction might
+ * have an Xmin but not (yet) an Xid; conversely, if it has
+ * an Xid, that could determine some not-yet-set Xmin.
+ */
+ xid = proc->xmin; /* Fetch just once */
+ if (TransactionIdIsNormal(xid) &&
+ TransactionIdPrecedes(xid, result))
+ result = xid;
}
}
@@ -545,8 +548,6 @@ GetSnapshotData(Snapshot snapshot, bool serializable)
errmsg("out of memory")));
}
- globalxmin = xmin = GetTopTransactionId();
-
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* computing a serializable snapshot and therefore will be setting
@@ -557,6 +558,19 @@ GetSnapshotData(Snapshot snapshot, bool serializable)
* discussion just below). So it doesn't matter whether another backend
* concurrently doing GetSnapshotData or GetOldestXmin sees our xmin as
* set or not; he'd compute the same xmin for himself either way.
+ * (We are assuming here that xmin can be set and read atomically,
+ * just like xid.)
+ *
+ * There is a corner case in which the above argument doesn't work: if
+ * there isn't any oldest xact, ie, all xids in the array are invalid.
+ * In that case we will compute xmin as the result of ReadNewTransactionId,
+ * and since GetNewTransactionId doesn't take the ProcArrayLock, it's not
+ * so obvious that two backends with overlapping shared locks will get
+ * the same answer. But GetNewTransactionId is required to store the XID
+ * it assigned into the ProcArray before releasing XidGenLock. Therefore
+ * the backend that did ReadNewTransactionId later will see that XID in
+ * the array, and will compute the same xmin as the earlier one that saw
+ * no XIDs in the array.
*/
LWLockAcquire(ProcArrayLock, LW_SHARED);
@@ -589,6 +603,9 @@ GetSnapshotData(Snapshot snapshot, bool serializable)
xmax = ReadNewTransactionId();
+ /* initialize xmin calculation with xmax */
+ globalxmin = xmin = xmax;
+
/*
* Spin over procArray checking xid, xmin, and subxids. The goal is
* to gather all active xids, find the lowest xmin, and try to record
@@ -597,34 +614,40 @@ GetSnapshotData(Snapshot snapshot, bool serializable)
for (index = 0; index < arrayP->numProcs; index++)
{
PGPROC *proc = arrayP->procs[index];
+ TransactionId xid;
+
+ /* Ignore procs running LAZY VACUUM */
+ if (proc->inVacuum)
+ continue;
+
+ /* Update globalxmin to be the smallest valid xmin */
+ xid = proc->xmin; /* fetch just once */
+ if (TransactionIdIsNormal(xid) &&
+ TransactionIdPrecedes(xid, globalxmin))
+ globalxmin = xid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId xid = proc->xid;
+ xid = proc->xid;
/*
- * Ignore my own proc (dealt with my xid above), procs not running a
- * transaction, xacts started since we read the next transaction ID,
- * and xacts executing LAZY VACUUM. There's no need to store XIDs
- * above what we got from ReadNewTransactionId, since we'll treat them
- * as running anyway. We also assume that such xacts can't compute an
- * xmin older than ours, so they needn't be considered in computing
- * globalxmin.
+ * If the transaction has been assigned an xid < xmax we add it to the
+ * snapshot, and update xmin if necessary. There's no need to store
+ * XIDs above what we got from ReadNewTransactionId, since we'll treat
+ * them as running anyway. We don't bother to examine their subxids
+ * either.
+ *
+ * We don't include our own XID (if any) in the snapshot, but we must
+ * include it into xmin.
*/
- if (proc == MyProc ||
- !TransactionIdIsNormal(xid) ||
- TransactionIdFollowsOrEquals(xid, xmax) ||
- proc->inVacuum)
- continue;
-
- if (TransactionIdPrecedes(xid, xmin))
- xmin = xid;
- snapshot->xip[count++] = xid;
-
- /* Update globalxmin to be the smallest valid xmin */
- xid = proc->xmin;
if (TransactionIdIsNormal(xid))
- if (TransactionIdPrecedes(xid, globalxmin))
- globalxmin = xid;
+ {
+ if (TransactionIdFollowsOrEquals(xid, xmax))
+ continue;
+ if (proc != MyProc)
+ snapshot->xip[count++] = xid;
+ if (TransactionIdPrecedes(xid, xmin))
+ xmin = xid;
+ }
/*
* Save subtransaction XIDs if possible (if we've already overflowed,
@@ -635,8 +658,10 @@ GetSnapshotData(Snapshot snapshot, bool serializable)
* remove any. Hence it's important to fetch nxids just once. Should
* be safe to use memcpy, though. (We needn't worry about missing any
* xids added concurrently, because they must postdate xmax.)
+ *
+ * Again, our own XIDs are not included in the snapshot.
*/
- if (subcount >= 0)
+ if (subcount >= 0 && proc != MyProc)
{
if (proc->subxids.overflowed)
subcount = -1; /* overflowed */
@@ -818,6 +843,9 @@ BackendPidGetProc(int pid)
*
* Only main transaction Ids are considered. This function is mainly
* useful for determining what backend owns a lock.
+ *
+ * Beware that not every xact has an XID assigned. However, as long as you
+ * only call this using an XID found on disk, you're safe.
*/
int
BackendXidGetPid(TransactionId xid)
@@ -856,6 +884,63 @@ IsBackendPid(int pid)
return (BackendPidGetProc(pid) != NULL);
}
+
+/*
+ * GetCurrentVirtualXIDs -- returns an array of currently active VXIDs.
+ *
+ * The array is palloc'd and is terminated with an invalid VXID.
+ *
+ * If limitXmin is not InvalidTransactionId, we skip any backends
+ * with xmin >= limitXmin. Also, our own process is always skipped.
+ */
+VirtualTransactionId *
+GetCurrentVirtualXIDs(TransactionId limitXmin)
+{
+ VirtualTransactionId *vxids;
+ ProcArrayStruct *arrayP = procArray;
+ int count = 0;
+ int index;
+
+ /* allocate result space with room for a terminator */
+ vxids = (VirtualTransactionId *)
+ palloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ PGPROC *proc = arrayP->procs[index];
+ /* Fetch xmin just once - might change on us? */
+ TransactionId pxmin = proc->xmin;
+
+ if (proc == MyProc)
+ continue;
+
+ /*
+ * Note that InvalidTransactionId precedes all other XIDs, so a
+ * proc that hasn't set xmin yet will always be included.
+ */
+ if (!TransactionIdIsValid(limitXmin) ||
+ TransactionIdPrecedes(pxmin, limitXmin))
+ {
+ VirtualTransactionId vxid;
+
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+ if (VirtualTransactionIdIsValid(vxid))
+ vxids[count++] = vxid;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ /* add the terminator */
+ vxids[count].backendId = InvalidBackendId;
+ vxids[count].localTransactionId = InvalidLocalTransactionId;
+
+ return vxids;
+}
+
+
/*
* CountActiveBackends --- count backends (other than myself) that are in
* active transactions. This is used as a heuristic to decide if
@@ -885,7 +970,7 @@ CountActiveBackends(void)
if (proc->pid == 0)
continue; /* do not count prepared xacts */
if (proc->xid == InvalidTransactionId)
- continue; /* do not count if not in a transaction */
+ continue; /* do not count if no XID assigned */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
count++;
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 31c4a2dfad..99690d8b36 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.63 2007/01/05 22:19:38 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.64 2007/09/05 18:10:47 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -19,12 +19,15 @@
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pmsignal.h"
+#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
SISeg *shmInvalBuffer;
+static LocalTransactionId nextLocalTransactionId;
+
static void CleanupInvalidationState(int status, Datum arg);
static void SISetProcStateInvalid(SISeg *segP);
@@ -40,6 +43,8 @@ SInvalShmemSize(void)
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
+ size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
+
return size;
}
@@ -51,15 +56,21 @@ void
SIBufferInit(void)
{
SISeg *segP;
+ Size size;
int i;
bool found;
/* Allocate space in shared memory */
+ size = offsetof(SISeg, procState);
+ size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
+
shmInvalBuffer = segP = (SISeg *)
- ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
+ ShmemInitStruct("shmInvalBuffer", size, &found);
if (found)
return;
+ segP->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
+
/* Clear message counters, save size of procState array */
segP->minMsgNum = 0;
segP->maxMsgNum = 0;
@@ -69,11 +80,12 @@ SIBufferInit(void)
/* The buffer[] array is initially all unused, so we need not fill it */
- /* Mark all backends inactive */
+ /* Mark all backends inactive, and initialize nextLXID */
for (i = 0; i < segP->maxBackends; i++)
{
segP->procState[i].nextMsgNum = -1; /* inactive */
segP->procState[i].resetState = false;
+ segP->nextLXID[i] = InvalidLocalTransactionId;
}
}
@@ -128,9 +140,15 @@ SIBackendInit(SISeg *segP)
elog(DEBUG2, "my backend id is %d", MyBackendId);
#endif /* INVALIDDEBUG */
+ /* Advertise assigned backend ID in MyProc */
+ MyProc->backendId = MyBackendId;
+
/* Reduce free slot count */
segP->freeBackends--;
+ /* Fetch next local transaction ID into local memory */
+ nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
+
/* mark myself active, with all extant messages already read */
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
@@ -160,6 +178,9 @@ CleanupInvalidationState(int status, Datum arg)
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+ /* Update next local transaction ID for next holder of this backendID */
+ segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
+
/* Mark myself inactive */
segP->procState[MyBackendId - 1].nextMsgNum = -1;
segP->procState[MyBackendId - 1].resetState = false;
@@ -352,3 +373,30 @@ SIDelExpiredDataEntries(SISeg *segP)
}
}
}
+
+
+/*
+ * GetNextLocalTransactionId --- allocate a new LocalTransactionId
+ *
+ * We split VirtualTransactionIds into two parts so that it is possible
+ * to allocate a new one without any contention for shared memory, except
+ * for a bit of additional overhead during backend startup/shutdown.
+ * The high-order part of a VirtualTransactionId is a BackendId, and the
+ * low-order part is a LocalTransactionId, which we assign from a local
+ * counter. To avoid the risk of a VirtualTransactionId being reused
+ * within a short interval, successive procs occupying the same backend ID
+ * slot should use a consecutive sequence of local IDs, which is implemented
+ * by copying nextLocalTransactionId as seen above.
+ */
+LocalTransactionId
+GetNextLocalTransactionId(void)
+{
+ LocalTransactionId result;
+
+ /* loop to avoid returning InvalidLocalTransactionId at wraparound */
+ do {
+ result = nextLocalTransactionId++;
+ } while (!LocalTransactionIdIsValid(result));
+
+ return result;
+}