summaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/heap/heapam.c594
-rw-r--r--src/backend/access/transam/Makefile4
-rw-r--r--src/backend/access/transam/multixact.c1557
-rw-r--r--src/backend/access/transam/xact.c12
-rw-r--r--src/backend/access/transam/xlog.c65
5 files changed, 2115 insertions, 117 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 605ed62942..ee604df2ca 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.187 2005/04/14 20:03:22 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -40,12 +40,14 @@
#include "access/heapam.h"
#include "access/hio.h"
+#include "access/multixact.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "miscadmin.h"
+#include "storage/sinval.h"
#include "utils/inval.h"
#include "utils/relcache.h"
#include "pgstat.h"
@@ -1238,30 +1240,81 @@ l1:
}
else if (result == HeapTupleBeingUpdated && wait)
{
- TransactionId xwait = HeapTupleHeaderGetXmax(tp.t_data);
-
- /* sleep until concurrent transaction ends */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (!TransactionIdDidCommit(xwait))
- goto l1;
+ TransactionId xwait;
+ uint16 infomask;
/*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
+ * Sleep until concurrent transaction ends. Note that we don't care
+ * if the locker has an exclusive or shared lock, because we need
+ * exclusive.
*/
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), xwait))
- goto l1;
- if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
+
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(tp.t_data);
+ infomask = tp.t_data->t_infomask;
+
+ if (infomask & HEAP_XMAX_IS_MULTI)
{
- tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(buffer);
+ /* wait for multixact */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * If xwait had just locked the tuple then some other xact could
+ * update this tuple before we get to this point. Check for xmax
+ * change, and start over if so.
+ */
+ if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
+ xwait))
+ goto l1;
+
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * delete the tuple in either case, however (the latter case is
+ * essentially a situation of upgrading our former shared lock
+ * to exclusive). We don't bother changing the on-disk hint bits
+ * since we are about to overwrite the xmax altogether.
+ */
}
- /* if tuple was marked for update but not updated... */
- if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
+ else
+ {
+ /* wait for regular transaction to end */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ XactLockTableWait(xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * xwait is done, but if xwait had just locked the tuple then some
+ * other xact could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
+ xwait))
+ goto l1;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(tp.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ tp.t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(buffer);
+ }
+ }
+
+ /*
+ * We may overwrite if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it.
+ */
+ if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
@@ -1290,7 +1343,8 @@ l1:
/* store transaction information of xact deleting the tuple */
tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(tp.t_data, xid);
HeapTupleHeaderSetCmax(tp.t_data, cid);
@@ -1465,30 +1519,81 @@ l2:
}
else if (result == HeapTupleBeingUpdated && wait)
{
- TransactionId xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
-
- /* sleep until concurrent transaction ends */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (!TransactionIdDidCommit(xwait))
- goto l2;
+ TransactionId xwait;
+ uint16 infomask;
/*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
+ * Sleep until concurrent transaction ends. Note that we don't care
+ * if the locker has an exclusive or shared lock, because we need
+ * exclusive.
*/
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), xwait))
- goto l2;
- if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
+
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
+ infomask = oldtup.t_data->t_infomask;
+
+ if (infomask & HEAP_XMAX_IS_MULTI)
{
- oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(buffer);
+ /* wait for multixact */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * If xwait had just locked the tuple then some other xact could
+ * update this tuple before we get to this point. Check for xmax
+ * change, and start over if so.
+ */
+ if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
+ xwait))
+ goto l2;
+
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * update the tuple in either case, however (the latter case is
+ * essentially a situation of upgrading our former shared lock
+ * to exclusive). We don't bother changing the on-disk hint bits
+ * since we are about to overwrite the xmax altogether.
+ */
}
- /* if tuple was marked for update but not updated... */
- if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
+ else
+ {
+ /* wait for regular transaction to end */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ XactLockTableWait(xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * xwait is done, but if xwait had just locked the tuple then some
+ * other xact could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
+ xwait))
+ goto l2;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(oldtup.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ oldtup.t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(buffer);
+ }
+ }
+
+ /*
+ * We may overwrite if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it.
+ */
+ if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
@@ -1556,7 +1661,8 @@ l2:
{
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid);
@@ -1642,7 +1748,8 @@ l2:
{
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid);
@@ -1739,17 +1846,18 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
}
/*
- * heap_mark4update - mark a tuple for update
+ * heap_lock_tuple - lock a tuple in shared or exclusive mode
*/
HTSU_Result
-heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
- CommandId cid)
+heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
+ CommandId cid, LockTupleMode mode)
{
- TransactionId xid = GetCurrentTransactionId();
+ TransactionId xid;
ItemPointer tid = &(tuple->t_self);
ItemId lp;
PageHeader dp;
HTSU_Result result;
+ uint16 new_infomask;
*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -1767,38 +1875,93 @@ l3:
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(*buffer);
- elog(ERROR, "attempted to mark4update invisible tuple");
+ elog(ERROR, "attempted to lock invisible tuple");
}
else if (result == HeapTupleBeingUpdated)
{
- TransactionId xwait = HeapTupleHeaderGetXmax(tuple->t_data);
+ if (mode == LockTupleShared &&
+ (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
+ result = HeapTupleMayBeUpdated;
+ else
+ {
+ TransactionId xwait;
+ uint16 infomask;
- /* sleep until concurrent transaction ends */
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
+ /*
+ * Sleep until concurrent transaction ends.
+ */
- LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
- if (!TransactionIdDidCommit(xwait))
- goto l3;
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(tuple->t_data);
+ infomask = tuple->t_data->t_infomask;
- /*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
- */
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), xwait))
- goto l3;
- if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
- {
- tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(*buffer);
+ if (infomask & HEAP_XMAX_IS_MULTI)
+ {
+ /* wait for multixact */
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * If xwait had just locked the tuple then some other xact
+ * could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
+ xwait))
+ goto l3;
+
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * lock the tuple in either case, however. We don't bother
+ * changing the on-disk hint bits since we are about to
+ * overwrite the xmax altogether.
+ */
+ }
+ else
+ {
+ /* wait for regular transaction to end */
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ XactLockTableWait(xwait);
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * xwait is done, but if xwait had just locked the tuple then
+ * some other xact could update this tuple before we get to
+ * this point. Check for xmax change, and start over if so.
+ */
+ if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
+ xwait))
+ goto l3;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(*buffer);
+ }
+ }
+
+ /*
+ * We may lock if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it.
+ */
+ if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
+ result = HeapTupleMayBeUpdated;
+ else
+ result = HeapTupleUpdated;
}
- /* if tuple was marked for update but not updated... */
- if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- result = HeapTupleMayBeUpdated;
- else
- result = HeapTupleUpdated;
}
+
if (result != HeapTupleMayBeUpdated)
{
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
@@ -1808,21 +1971,173 @@ l3:
}
/*
- * XLOG stuff: no logging is required as long as we have no
- * savepoints. For savepoints private log could be used...
+ * Compute the new xmax and infomask to store into the tuple. Note we
+ * do not modify the tuple just yet, because that would leave it in the
+ * wrong state if multixact.c elogs.
*/
- PageSetTLI(BufferGetPage(*buffer), ThisTimeLineID);
+ xid = GetCurrentTransactionId();
+
+ new_infomask = tuple->t_data->t_infomask;
+
+ new_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
+ HEAP_MOVED);
+
+ if (mode == LockTupleShared)
+ {
+ TransactionId xmax = HeapTupleHeaderGetXmax(tuple->t_data);
+ uint16 old_infomask = tuple->t_data->t_infomask;
+
+ /*
+ * If this is the first acquisition of a shared lock in the current
+ * transaction, set my per-backend OldestMemberMXactId setting.
+ * We can be certain that the transaction will never become a
+ * member of any older MultiXactIds than that. (We have to do this
+ * even if we end up just using our own TransactionId below, since
+ * some other backend could incorporate our XID into a MultiXact
+ * immediately afterwards.)
+ */
+ MultiXactIdSetOldestMember();
+
+ new_infomask |= HEAP_XMAX_SHARED_LOCK;
+
+ /*
+ * Check to see if we need a MultiXactId because there are multiple
+ * lockers.
+ *
+ * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID
+ * bit if the xmax was a MultiXactId but it was not running anymore.
+ * There is a race condition, which is that the MultiXactId may have
+ * finished since then, but that uncommon case is handled within
+ * MultiXactIdExpand.
+ *
+ * There is a similar race condition possible when the old xmax was
+ * a regular TransactionId. We test TransactionIdIsInProgress again
+ * just to narrow the window, but it's still possible to end up
+ * creating an unnecessary MultiXactId. Fortunately this is harmless.
+ */
+ if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED)))
+ {
+ if (old_infomask & HEAP_XMAX_IS_MULTI)
+ {
+ /*
+ * If the XMAX is already a MultiXactId, then we need to
+ * expand it to include our own TransactionId.
+ */
+ xid = MultiXactIdExpand(xmax, true, xid);
+ new_infomask |= HEAP_XMAX_IS_MULTI;
+ }
+ else if (TransactionIdIsInProgress(xmax))
+ {
+ if (TransactionIdEquals(xmax, xid))
+ {
+ /*
+ * If the old locker is ourselves, we'll just mark the
+ * tuple again with our own TransactionId. However we
+ * have to consider the possibility that we had
+ * exclusive rather than shared lock before --- if so,
+ * be careful to preserve the exclusivity of the lock.
+ */
+ if (!(old_infomask & HEAP_XMAX_SHARED_LOCK))
+ {
+ new_infomask &= ~HEAP_XMAX_SHARED_LOCK;
+ new_infomask |= HEAP_XMAX_EXCL_LOCK;
+ mode = LockTupleExclusive;
+ }
+ }
+ else
+ {
+ /*
+ * If the Xmax is a valid TransactionId, then we need to
+ * create a new MultiXactId that includes both the old
+ * locker and our own TransactionId.
+ */
+ xid = MultiXactIdExpand(xmax, false, xid);
+ new_infomask |= HEAP_XMAX_IS_MULTI;
+ }
+ }
+ else
+ {
+ /*
+ * Can get here iff HeapTupleSatisfiesUpdate saw the old
+ * xmax as running, but it finished before
+ * TransactionIdIsInProgress() got to run. Treat it like
+ * there's no locker in the tuple.
+ */
+ }
+ }
+ else
+ {
+ /*
+ * There was no previous locker, so just insert our own
+ * TransactionId.
+ */
+ }
+ }
+ else
+ {
+ /* We want an exclusive lock on the tuple */
+ new_infomask |= HEAP_XMAX_EXCL_LOCK;
+ }
+
+ START_CRIT_SECTION();
- /* store transaction information of xact marking the tuple */
- tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID |
- HEAP_MOVED);
- tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;
+ /*
+ * Store transaction information of xact locking the tuple.
+ *
+ * Note: our CID is meaningless if storing a MultiXactId, but no harm
+ * in storing it anyway.
+ */
+ tuple->t_data->t_infomask = new_infomask;
HeapTupleHeaderSetXmax(tuple->t_data, xid);
HeapTupleHeaderSetCmax(tuple->t_data, cid);
/* Make sure there is no forward chain link in t_ctid */
tuple->t_data->t_ctid = *tid;
+ /*
+ * XLOG stuff. You might think that we don't need an XLOG record because
+ * there is no state change worth restoring after a crash. You would be
+ * wrong however: we have just written either a TransactionId or a
+ * MultiXactId that may never have been seen on disk before, and we need
+ * to make sure that there are XLOG entries covering those ID numbers.
+ * Else the same IDs might be re-used after a crash, which would be
+ * disastrous if this page made it to disk before the crash. Essentially
+ * we have to enforce the WAL log-before-data rule even in this case.
+ */
+ if (!relation->rd_istemp)
+ {
+ xl_heap_lock xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.tid = tuple->t_self;
+ xlrec.shared_lock = (mode == LockTupleShared);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfHeapLock;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].buffer = *buffer;
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
+
+ PageSetLSN(dp, recptr);
+ PageSetTLI(dp, ThisTimeLineID);
+ }
+ else
+ {
+ /* No XLOG record, but still need to flag that XID exists on disk */
+ MyXactMadeTempRelUpdate = true;
+ }
+
+ END_CRIT_SECTION();
+
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
WriteNoReleaseBuffer(*buffer);
@@ -1832,17 +2147,6 @@ l3:
/* ----------------
* heap_markpos - mark scan position
- *
- * Note:
- * Should only one mark be maintained per scan at one time.
- * Check if this can be done generally--say calls to get the
- * next/previous tuple and NEVER pass struct scandesc to the
- * user AM's. Now, the mark is sent to the executor for safekeeping.
- * Probably can store this info into a GENERAL scan structure.
- *
- * May be best to change this call to store the marked position
- * (up to 2?) in the scan structure itself.
- * Fix to use the proper caching structure.
* ----------------
*/
void
@@ -1858,19 +2162,6 @@ heap_markpos(HeapScanDesc scan)
/* ----------------
* heap_restrpos - restore position to marked location
- *
- * Note: there are bad side effects here. If we were past the end
- * of a relation when heapmarkpos is called, then if the relation is
- * extended via insert, then the next call to heaprestrpos will set
- * cause the added tuples to be visible when the scan continues.
- * Problems also arise if the TID's are rearranged!!!
- *
- * XXX might be better to do direct access instead of
- * using the generality of heapgettup().
- *
- * XXX It is very possible that when a scan is restored, that a tuple
- * XXX which previously qualified may fail for time range purposes, unless
- * XXX some form of locking exists (ie., portals currently can act funny.
* ----------------
*/
void
@@ -1996,8 +2287,7 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
{
TransactionId xid[2]; /* xmax, xmin */
- if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE))
+ if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED))
xid[0] = InvalidTransactionId;
else
xid[0] = HeapTupleHeaderGetXmax(newtup->t_data);
@@ -2185,7 +2475,8 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(htup, record->xl_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId);
@@ -2365,7 +2656,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
{
htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(htup, record->xl_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId);
@@ -2487,6 +2779,82 @@ newsame:;
}
+static void
+heap_xlog_lock(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
+ return;
+
+ reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
+
+ if (!RelationIsValid(reln))
+ return;
+
+ buffer = XLogReadBuffer(false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "heap_lock_%sdo: no block", (redo) ? "re" : "un");
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(PANIC, "heap_lock_%sdo: uninitialized page", (redo) ? "re" : "un");
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+ }
+ else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied
+ * ?! */
+ elog(PANIC, "heap_lock_undo: bad page LSN");
+
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
+ elog(PANIC, "heap_lock_%sdo: invalid lp", (redo) ? "re" : "un");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ if (redo)
+ {
+ /*
+ * Presently, we don't bother to restore the locked state, but
+ * just set the XMAX_INVALID bit.
+ */
+ htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
+ HEAP_MOVED);
+ htup->t_infomask |= HEAP_XMAX_INVALID;
+ HeapTupleHeaderSetXmax(htup, record->xl_xid);
+ HeapTupleHeaderSetCmax(htup, FirstCommandId);
+ /* Make sure there is no forward chain link in t_ctid */
+ htup->t_ctid = xlrec->target.tid;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buffer);
+ return;
+ }
+
+ elog(PANIC, "heap_lock_undo: unimplemented");
+}
+
void
heap_redo(XLogRecPtr lsn, XLogRecord *record)
{
@@ -2505,6 +2873,8 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
heap_xlog_clean(true, lsn, record);
else if (info == XLOG_HEAP_NEWPAGE)
heap_xlog_newpage(true, lsn, record);
+ else if (info == XLOG_HEAP_LOCK)
+ heap_xlog_lock(true, lsn, record);
else
elog(PANIC, "heap_redo: unknown op code %u", info);
}
@@ -2527,6 +2897,8 @@ heap_undo(XLogRecPtr lsn, XLogRecord *record)
heap_xlog_clean(false, lsn, record);
else if (info == XLOG_HEAP_NEWPAGE)
heap_xlog_newpage(false, lsn, record);
+ else if (info == XLOG_HEAP_LOCK)
+ heap_xlog_lock(false, lsn, record);
else
elog(PANIC, "heap_undo: unknown op code %u", info);
}
@@ -2589,6 +2961,16 @@ heap_desc(char *buf, uint8 xl_info, char *rec)
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->blkno);
}
+ else if (info == XLOG_HEAP_LOCK)
+ {
+ xl_heap_lock *xlrec = (xl_heap_lock *) rec;
+
+ if (xlrec->shared_lock)
+ strcat(buf, "shared_lock: ");
+ else
+ strcat(buf, "exclusive_lock: ");
+ out_target(buf, &(xlrec->target));
+ }
else
strcat(buf, "UNKNOWN");
}
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index fe740a045f..295ecf9e14 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -4,7 +4,7 @@
# Makefile for access/transam
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/access/transam/Makefile,v 1.19 2004/07/01 00:49:42 tgl Exp $
+# $PostgreSQL: pgsql/src/backend/access/transam/Makefile,v 1.20 2005/04/28 21:47:10 tgl Exp $
#
#-------------------------------------------------------------------------
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o
+OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o
all: SUBSYS.o
diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
new file mode 100644
index 0000000000..de1a88205f
--- /dev/null
+++ b/src/backend/access/transam/multixact.c
@@ -0,0 +1,1557 @@
+/*-------------------------------------------------------------------------
+ *
+ * multixact.c
+ * PostgreSQL multi-transaction-log manager
+ *
+ * The pg_multixact manager is a pg_clog-like manager that stores an array
+ * of TransactionIds for each MultiXactId. It is a fundamental part of the
+ * shared-row-lock implementation. A share-locked tuple stores a
+ * MultiXactId in its Xmax, and a transaction that needs to wait for the
+ * tuple to be unlocked can sleep on the potentially-several TransactionIds
+ * that compose the MultiXactId.
+ *
+ * We use two SLRU areas, one for storing the offsets on which the data
+ * starts for each MultiXactId in the other one. This trick allows us to
+ * store variable length arrays of TransactionIds. (We could alternatively
+ * use one area containing counts and TransactionIds, with valid MultiXactId
+ * values pointing at slots containing counts; but that way seems less robust
+ * since it would get completely confused if someone inquired about a bogus
+ * MultiXactId that pointed to an intermediate slot containing an XID.)
+ *
+ * This code is based on subtrans.c; see it for additional discussion.
+ * Like the subtransaction manager, we only need to remember multixact
+ * information for currently-open transactions. Thus, there is
+ * no need to preserve data over a crash and restart.
+ *
+ * The only XLOG interaction we need to take care of is that generated
+ * MultiXactId values must continue to increase across a system crash.
+ * Thus we log groups of MultiXactIds acquisition in the same fashion we do
+ * for Oids (see XLogPutNextMultiXactId).
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL: pgsql/src/backend/access/transam/multixact.c,v 1.1 2005/04/28 21:47:10 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/multixact.h"
+#include "access/slru.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+#include "storage/backendid.h"
+#include "storage/lmgr.h"
+#include "storage/sinval.h"
+
+
+/*
+ * Defines for MultiXactOffset page sizes. A page is the same BLCKSZ as is
+ * used everywhere else in Postgres.
+ *
+ * Note: because both uint32 and TransactionIds are 32 bits and wrap around at
+ * 0xFFFFFFFF, MultiXact page numbering also wraps around at
+ * 0xFFFFFFFF/MULTIXACT_*_PER_PAGE, and segment numbering at
+ * 0xFFFFFFFF/MULTIXACT_*_PER_PAGE/SLRU_SEGMENTS_PER_PAGE. We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateMultiXact
+ * (see MultiXact{Offset,Member}PagePrecedes).
+ */
+
+/* We need four bytes per offset and also four bytes per member */
+#define MULTIXACT_OFFSETS_PER_PAGE (BLCKSZ / sizeof(uint32))
+#define MULTIXACT_MEMBERS_PER_PAGE (BLCKSZ / sizeof(TransactionId))
+
+#define MultiXactIdToOffsetPage(xid) \
+ ((xid) / (uint32) MULTIXACT_OFFSETS_PER_PAGE)
+#define MultiXactIdToOffsetEntry(xid) \
+ ((xid) % (uint32) MULTIXACT_OFFSETS_PER_PAGE)
+
+#define MXOffsetToMemberPage(xid) \
+ ((xid) / (TransactionId) MULTIXACT_MEMBERS_PER_PAGE)
+#define MXOffsetToMemberEntry(xid) \
+ ((xid) % (TransactionId) MULTIXACT_MEMBERS_PER_PAGE)
+
+/* Arbitrary number of MultiXactIds to allocate at each XLog call */
+#define MXACT_PREFETCH 8192
+
+/*
+ * Links to shared-memory data structures for MultiXact control
+ */
+static SlruCtlData MultiXactOffsetCtlData;
+static SlruCtlData MultiXactMemberCtlData;
+
+#define MultiXactOffsetCtl (&MultiXactOffsetCtlData)
+#define MultiXactMemberCtl (&MultiXactMemberCtlData)
+
+/*
+ * MultiXact state shared across all backends. All this state is protected
+ * by MultiXactGenLock. (We also use MultiXactOffsetControlLock and
+ * MultiXactMemberControlLock to guard accesses to the two sets of SLRU
+ * buffers. For concurrency's sake, we avoid holding more than one of these
+ * locks at a time.)
+ */
+typedef struct MultiXactStateData
+{
+ /* next-to-be-assigned MultiXactId */
+ MultiXactId nextMXact;
+
+ /* MultiXactIds we have left before logging more */
+ uint32 mXactCount;
+
+ /* next-to-be-assigned offset */
+ uint32 nextOffset;
+
+ /* the Offset SLRU area was last truncated at this MultiXactId */
+ MultiXactId lastTruncationPoint;
+
+ /*
+ * Per-backend data starts here. We have two arrays stored in
+ * the area immediately following the MultiXactStateData struct.
+ * Each is indexed by BackendId. (Note: valid BackendIds run from 1 to
+ * MaxBackends; element zero of each array is never used.)
+ *
+ * OldestMemberMXactId[k] is the oldest MultiXactId each backend's
+ * current transaction(s) could possibly be a member of, or
+ * InvalidMultiXactId when the backend has no live transaction that
+ * could possibly be a member of a MultiXact. Each backend sets its
+ * entry to the current nextMXact counter just before first acquiring a
+ * shared lock in a given transaction, and clears it at transaction end.
+ * (This works because only during or after acquiring a shared lock
+ * could an XID possibly become a member of a MultiXact, and that
+ * MultiXact would have to be created during or after the lock
+ * acquisition.)
+ *
+ * OldestVisibleMXactId[k] is the oldest MultiXactId each backend's
+ * current transaction(s) think is potentially live, or InvalidMultiXactId
+ * when not in a transaction or not in a transaction that's paid any
+ * attention to MultiXacts yet. This is computed when first needed in
+ * a given transaction, and cleared at transaction end. We can compute
+ * it as the minimum of the valid OldestMemberMXactId[] entries at the
+ * time we compute it (using nextMXact if none are valid). Each backend
+ * is required not to attempt to access any SLRU data for MultiXactIds
+ * older than its own OldestVisibleMXactId[] setting; this is necessary
+ * because the checkpointer could truncate away such data at any instant.
+ *
+ * The checkpointer can compute the safe truncation point as the oldest
+ * valid value among all the OldestMemberMXactId[] and
+ * OldestVisibleMXactId[] entries, or nextMXact if none are valid.
+ * Clearly, it is not possible for any later-computed OldestVisibleMXactId
+ * value to be older than this, and so there is no risk of truncating
+ * data that is still needed.
+ */
+ MultiXactId perBackendXactIds[1]; /* VARIABLE LENGTH ARRAY */
+} MultiXactStateData;
+
+/* Pointers to the state data in shared memory */
+static MultiXactStateData *MultiXactState;
+static MultiXactId *OldestMemberMXactId;
+static MultiXactId *OldestVisibleMXactId;
+
+
+/*
+ * Definitions for the backend-local MultiXactId cache.
+ *
+ * We use this cache to store known MultiXacts, so we don't need to go to
+ * SLRU areas everytime.
+ *
+ * The cache lasts for the duration of a single transaction, the rationale
+ * for this being that most entries will contain our own TransactionId and
+ * so they will be uninteresting by the time our next transaction starts.
+ * (XXX not clear that this is correct --- other members of the MultiXact
+ * could hang around longer than we did.)
+ *
+ * We allocate the cache entries in a memory context that is deleted at
+ * transaction end, so we don't need to do retail freeing of entries.
+ */
+typedef struct mXactCacheEnt
+{
+ struct mXactCacheEnt *next;
+ MultiXactId multi;
+ int nxids;
+ TransactionId xids[1]; /* VARIABLE LENGTH ARRAY */
+} mXactCacheEnt;
+
+static mXactCacheEnt *MXactCache = NULL;
+static MemoryContext MXactContext = NULL;
+
+
+#ifdef MULTIXACT_DEBUG
+#define debug_elog2(a,b) elog(a,b)
+#define debug_elog3(a,b,c) elog(a,b,c)
+#define debug_elog4(a,b,c,d) elog(a,b,c,d)
+#define debug_elog5(a,b,c,d,e) elog(a,b,c,d,e)
+#else
+#define debug_elog2(a,b)
+#define debug_elog3(a,b,c)
+#define debug_elog4(a,b,c,d)
+#define debug_elog5(a,b,c,d,e)
+#endif
+
+/* internal MultiXactId management */
+static void MultiXactIdSetOldestVisible(void);
+static MultiXactId CreateMultiXactId(int nxids, TransactionId *xids);
+static int GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids);
+static MultiXactId GetNewMultiXactId(int nxids, uint32 *offset);
+
+/* MultiXact cache management */
+static MultiXactId mXactCacheGetBySet(int nxids, TransactionId *xids);
+static int mXactCacheGetById(MultiXactId multi, TransactionId **xids);
+static void mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids);
+static int xidComparator(const void *arg1, const void *arg2);
+#ifdef MULTIXACT_DEBUG
+static char *mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids);
+#endif
+
+/* management of SLRU infrastructure */
+static int ZeroMultiXactOffsetPage(int pageno);
+static int ZeroMultiXactMemberPage(int pageno);
+static bool MultiXactOffsetPagePrecedes(int page1, int page2);
+static bool MultiXactMemberPagePrecedes(int page1, int page2);
+static bool MultiXactIdPrecedes(MultiXactId multi1, MultiXactId multi2);
+static bool MultiXactOffsetPrecedes(uint32 offset1, uint32 offset2);
+static void ExtendMultiXactOffset(MultiXactId multi);
+static void ExtendMultiXactMember(uint32 offset);
+static void TruncateMultiXact(void);
+
+
+/*
+ * MultiXactIdExpand
+ * Add a TransactionId to a possibly-already-existing MultiXactId.
+ *
+ * We abuse the notation for the first argument: if "isMulti" is true, then
+ * it's really a MultiXactId; else it's a TransactionId. We are already
+ * storing MultiXactId in HeapTupleHeader's xmax so assuming the datatypes
+ * are equivalent is necessary anyway.
+ *
+ * If isMulti is true, then get the members of the passed MultiXactId, add
+ * the passed TransactionId, and create a new MultiXactId. If isMulti is
+ * false, then take the two TransactionIds and create a new MultiXactId with
+ * them. The caller must ensure that the multi and xid are different
+ * in the latter case.
+ *
+ * If the TransactionId is already a member of the passed MultiXactId,
+ * just return it as-is.
+ *
+ * Note that we do NOT actually modify the membership of a pre-existing
+ * MultiXactId; instead we create a new one. This is necessary to avoid
+ * a race condition against MultiXactIdWait (see notes there).
+ *
+ * NB - we don't worry about our local MultiXactId cache here, because that
+ * is handled by the lower-level routines.
+ */
+MultiXactId
+MultiXactIdExpand(MultiXactId multi, bool isMulti, TransactionId xid)
+{
+ MultiXactId newMulti;
+ TransactionId *members;
+ TransactionId *newMembers;
+ int nmembers;
+ int i;
+ int j;
+
+ AssertArg(MultiXactIdIsValid(multi));
+ AssertArg(TransactionIdIsValid(xid));
+
+ debug_elog5(DEBUG2, "Expand: received %s %u, xid %u",
+ isMulti ? "MultiXactId" : "TransactionId",
+ multi, xid);
+
+ if (!isMulti)
+ {
+ /*
+ * The first argument is a TransactionId, not a MultiXactId.
+ */
+ TransactionId xids[2];
+
+ Assert(!TransactionIdEquals(multi, xid));
+
+ xids[0] = multi;
+ xids[1] = xid;
+
+ newMulti = CreateMultiXactId(2, xids);
+
+ debug_elog5(DEBUG2, "Expand: returning %u two-elem %u/%u",
+ newMulti, multi, xid);
+
+ return newMulti;
+ }
+
+ nmembers = GetMultiXactIdMembers(multi, &members);
+
+ if (nmembers < 0)
+ {
+ /*
+ * The MultiXactId is obsolete. This can only happen if all the
+ * MultiXactId members stop running between the caller checking and
+ * passing it to us. It would be better to return that fact to the
+ * caller, but it would complicate the API and it's unlikely to happen
+ * too often, so just deal with it by creating a singleton MultiXact.
+ */
+ newMulti = CreateMultiXactId(1, &xid);
+
+ debug_elog4(DEBUG2, "Expand: %u has no members, create singleton %u",
+ multi, newMulti);
+ return newMulti;
+ }
+
+ /*
+ * If the TransactionId is already a member of the MultiXactId,
+ * just return the existing MultiXactId.
+ */
+ for (i = 0; i < nmembers; i++)
+ {
+ if (TransactionIdEquals(members[i], xid))
+ {
+ pfree(members);
+ debug_elog4(DEBUG2, "Expand: %u is already a member of %u",
+ xid, multi);
+ return multi;
+ }
+ }
+
+ /*
+ * Determine which of the members of the MultiXactId are still running,
+ * and use them to create a new one. (Removing dead members is just
+ * an optimization, but a useful one. Note we have the same race
+ * condition here as above: j could be 0 at the end of the loop.)
+ */
+ newMembers = (TransactionId *)
+ palloc(sizeof(TransactionId) * (nmembers + 1));
+
+ for (i = 0, j = 0; i < nmembers; i++)
+ {
+ if (TransactionIdIsInProgress(members[i]))
+ newMembers[j++] = members[i];
+ }
+
+ newMembers[j++] = xid;
+ newMulti = CreateMultiXactId(j, newMembers);
+
+ pfree(members);
+ pfree(newMembers);
+
+ debug_elog3(DEBUG2, "Expand: returning new multi %u", newMulti);
+
+ return newMulti;
+}
+
+/*
+ * MultiXactIdIsRunning
+ * Returns whether a MultiXactId is "running".
+ *
+ * We return true if at least one member of the given MultiXactId is still
+ * running. Note that a "false" result is certain not to change,
+ * because it is not legal to add members to an existing MultiXactId.
+ */
+bool
+MultiXactIdIsRunning(MultiXactId multi)
+{
+ TransactionId *members;
+ TransactionId myXid;
+ int nmembers;
+ int i;
+
+ debug_elog3(DEBUG2, "IsRunning %u?", multi);
+
+ nmembers = GetMultiXactIdMembers(multi, &members);
+
+ if (nmembers < 0)
+ {
+ debug_elog2(DEBUG2, "IsRunning: no members");
+ return false;
+ }
+
+ /* checking for myself is cheap */
+ myXid = GetTopTransactionId();
+
+ for (i = 0; i < nmembers; i++)
+ {
+ if (TransactionIdEquals(members[i], myXid))
+ {
+ pfree(members);
+ debug_elog3(DEBUG2, "IsRunning: I (%d) am running!", i);
+ return true;
+ }
+ }
+
+ /*
+ * This could be made better by having a special entry point in sinval.c,
+ * walking the PGPROC array only once for the whole array. But in most
+ * cases nmembers should be small enough that it doesn't much matter.
+ */
+ for (i = 0; i < nmembers; i++)
+ {
+ if (TransactionIdIsInProgress(members[i]))
+ {
+ pfree(members);
+ debug_elog4(DEBUG2, "IsRunning: member %d (%u) is running",
+ i, members[i]);
+ return true;
+ }
+ }
+
+ pfree(members);
+ debug_elog3(DEBUG2, "IsRunning: %u is not running", multi);
+
+ return false;
+}
+
+/*
+ * MultiXactIdSetOldestMember
+ * Save the oldest MultiXactId this transaction could be a member of.
+ *
+ * We set the OldestMemberMXactId for a given transaction the first time
+ * it's going to acquire a shared lock. We need to do this even if we end
+ * up using a TransactionId instead of a MultiXactId, because there is a
+ * chance that another transaction would add our XID to a MultiXactId.
+ *
+ * The value to set is the next-to-be-assigned MultiXactId, so this is meant
+ * to be called just before acquiring a shared lock.
+ */
+void
+MultiXactIdSetOldestMember(void)
+{
+ if (!MultiXactIdIsValid(OldestMemberMXactId[MyBackendId]))
+ {
+ MultiXactId nextMXact;
+
+ /*
+ * You might think we don't need to acquire a lock here, since
+ * fetching and storing of TransactionIds is probably atomic,
+ * but in fact we do: suppose we pick up nextMXact and then
+ * lose the CPU for a long time. Someone else could advance
+ * nextMXact, and then another someone else could compute an
+ * OldestVisibleMXactId that would be after the value we are
+ * going to store when we get control back. Which would be wrong.
+ */
+ LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
+
+ /*
+ * We have to beware of the possibility that nextMXact is in the
+ * wrapped-around state. We don't fix the counter itself here,
+ * but we must be sure to store a valid value in our array entry.
+ */
+ nextMXact = MultiXactState->nextMXact;
+ if (nextMXact < FirstMultiXactId)
+ nextMXact = FirstMultiXactId;
+
+ OldestMemberMXactId[MyBackendId] = nextMXact;
+
+ LWLockRelease(MultiXactGenLock);
+
+ debug_elog4(DEBUG2, "MultiXact: setting OldestMember[%d] = %u",
+ MyBackendId, nextMXact);
+ }
+}
+
+/*
+ * MultiXactIdSetOldestVisible
+ * Save the oldest MultiXactId this transaction considers possibly live.
+ *
+ * We set the OldestVisibleMXactId for a given transaction the first time
+ * it's going to inspect any MultiXactId. Once we have set this, we are
+ * guaranteed that the checkpointer won't truncate off SLRU data for
+ * MultiXactIds at or after our OldestVisibleMXactId.
+ *
+ * The value to set is the oldest of nextMXact and all the valid per-backend
+ * OldestMemberMXactId[] entries. Because of the locking we do, we can be
+ * certain that no subsequent call to MultiXactIdSetOldestMember can set
+ * an OldestMemberMXactId[] entry older than what we compute here. Therefore
+ * there is no live transaction, now or later, that can be a member of any
+ * MultiXactId older than the OldestVisibleMXactId we compute here.
+ */
+static void
+MultiXactIdSetOldestVisible(void)
+{
+ if (!MultiXactIdIsValid(OldestVisibleMXactId[MyBackendId]))
+ {
+ MultiXactId oldestMXact;
+ int i;
+
+ LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
+
+ /*
+ * We have to beware of the possibility that nextMXact is in the
+ * wrapped-around state. We don't fix the counter itself here,
+ * but we must be sure to store a valid value in our array entry.
+ */
+ oldestMXact = MultiXactState->nextMXact;
+ if (oldestMXact < FirstMultiXactId)
+ oldestMXact = FirstMultiXactId;
+
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ MultiXactId thisoldest = OldestMemberMXactId[i];
+
+ if (MultiXactIdIsValid(thisoldest) &&
+ MultiXactIdPrecedes(thisoldest, oldestMXact))
+ oldestMXact = thisoldest;
+ }
+
+ OldestVisibleMXactId[MyBackendId] = oldestMXact;
+
+ LWLockRelease(MultiXactGenLock);
+
+ debug_elog4(DEBUG2, "MultiXact: setting OldestVisible[%d] = %u",
+ MyBackendId, oldestMXact);
+ }
+}
+
+/*
+ * MultiXactIdWait
+ * Sleep on a MultiXactId.
+ *
+ * We do this by sleeping on each member using XactLockTableWait. Any
+ * members that belong to the current backend are *not* waited for, however;
+ * this would not merely be useless but would lead to Assert failure inside
+ * XactLockTableWait. By the time this returns, it is certain that all
+ * transactions *of other backends* that were members of the MultiXactId
+ * are dead (and no new ones can have been added, since it is not legal
+ * to add members to an existing MultiXactId).
+ *
+ * But by the time we finish sleeping, someone else may have changed the Xmax
+ * of the containing tuple, so the caller needs to iterate on us somehow.
+ */
+void
+MultiXactIdWait(MultiXactId multi)
+{
+ TransactionId *members;
+ int nmembers;
+
+ nmembers = GetMultiXactIdMembers(multi, &members);
+
+ if (nmembers >= 0)
+ {
+ int i;
+
+ for (i = 0; i < nmembers; i++)
+ {
+ TransactionId member = members[i];
+
+ debug_elog4(DEBUG2, "MultiXactIdWait: waiting for %d (%u)",
+ i, member);
+ if (!TransactionIdIsCurrentTransactionId(member))
+ XactLockTableWait(member);
+ }
+
+ pfree(members);
+ }
+}
+
+/*
+ * CreateMultiXactId
+ * Make a new MultiXactId
+ *
+ * Make SLRU and cache entries for a new MultiXactId, recording the given
+ * TransactionIds as members. Returns the newly created MultiXactId.
+ *
+ * NB: the passed xids[] array will be sorted in-place.
+ */
+static MultiXactId
+CreateMultiXactId(int nxids, TransactionId *xids)
+{
+ MultiXactId multi;
+ int pageno;
+ int prev_pageno;
+ int entryno;
+ int slotno;
+ uint32 *offptr;
+ uint32 offset;
+ int i;
+
+ debug_elog3(DEBUG2, "Create: %s",
+ mxid_to_string(InvalidMultiXactId, nxids, xids));
+
+ /*
+ * See if the same set of XIDs already exists in our cache; if so, just
+ * re-use that MultiXactId. (Note: it might seem that looking in our
+ * cache is insufficient, and we ought to search disk to see if a
+ * duplicate definition already exists. But since we only ever create
+ * MultiXacts containing our own XID, in most cases any such MultiXacts
+ * were in fact created by us, and so will be in our cache. There are
+ * corner cases where someone else added us to a MultiXact without our
+ * knowledge, but it's not worth checking for.)
+ */
+ multi = mXactCacheGetBySet(nxids, xids);
+ if (MultiXactIdIsValid(multi))
+ {
+ debug_elog2(DEBUG2, "Create: in cache!");
+ return multi;
+ }
+
+ multi = GetNewMultiXactId(nxids, &offset);
+
+ LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
+
+ ExtendMultiXactOffset(multi);
+
+ pageno = MultiXactIdToOffsetPage(multi);
+ entryno = MultiXactIdToOffsetEntry(multi);
+
+ /*
+ * Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction"
+ * to complain about if there's any I/O error. This is kinda bogus, but
+ * since the errors will always give the full pathname, it should be
+ * clear enough that a MultiXactId is really involved. Perhaps someday
+ * we'll take the trouble to generalize the slru.c error reporting code.
+ */
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, multi);
+ offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
+ offptr += entryno;
+ *offptr = offset;
+
+ MultiXactOffsetCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;
+
+ /* Exchange our lock */
+ LWLockRelease(MultiXactOffsetControlLock);
+
+ debug_elog3(DEBUG2, "Create: got offset %u", offset);
+
+ LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
+
+ prev_pageno = -1;
+
+ for (i = 0; i < nxids; i++, offset++)
+ {
+ TransactionId *memberptr;
+
+ ExtendMultiXactMember(offset);
+
+ pageno = MXOffsetToMemberPage(offset);
+ entryno = MXOffsetToMemberEntry(offset);
+
+ if (pageno != prev_pageno)
+ {
+ slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, multi);
+ prev_pageno = pageno;
+ }
+
+ memberptr = (TransactionId *)
+ MultiXactMemberCtl->shared->page_buffer[slotno];
+ memberptr += entryno;
+
+ *memberptr = xids[i];
+ MultiXactMemberCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;
+ }
+
+ LWLockRelease(MultiXactMemberControlLock);
+
+ /* Store the new MultiXactId in the local cache, too */
+ mXactCachePut(multi, nxids, xids);
+ debug_elog2(DEBUG2, "Create: all done");
+
+ return multi;
+}
+
+/*
+ * GetNewMultiXactId
+ * Get the next MultiXactId.
+ *
+ * Get the next MultiXactId, XLogging if needed. Also, reserve the needed
+ * amount of space in the "members" area. The starting offset of the
+ * reserved space is returned in *offset.
+ */
+static MultiXactId
+GetNewMultiXactId(int nxids, uint32 *offset)
+{
+ MultiXactId result;
+
+ debug_elog3(DEBUG2, "GetNew: for %d xids", nxids);
+
+ /* MultiXactIdSetOldestMember() must have been called already */
+ Assert(MultiXactIdIsValid(OldestMemberMXactId[MyBackendId]));
+
+ LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
+
+ /* Handle wraparound of the nextMXact counter */
+ if (MultiXactState->nextMXact < FirstMultiXactId)
+ {
+ MultiXactState->nextMXact = FirstMultiXactId;
+ MultiXactState->mXactCount = 0;
+ }
+
+ /* If we run out of logged for use multixacts then we must log more */
+ if (MultiXactState->mXactCount == 0)
+ {
+ XLogPutNextMultiXactId(MultiXactState->nextMXact + MXACT_PREFETCH);
+ MultiXactState->mXactCount = MXACT_PREFETCH;
+ }
+
+ result = MultiXactState->nextMXact;
+
+ /*
+ * We don't care about MultiXactId wraparound here; it will be handled by
+ * the next iteration. But note that nextMXact may be InvalidMultiXactId
+ * after this routine exits, so anyone else looking at the variable must
+ * be prepared to deal with that.
+ */
+ (MultiXactState->nextMXact)++;
+ (MultiXactState->mXactCount)--;
+
+ /*
+ * Reserve the members space.
+ */
+ *offset = MultiXactState->nextOffset;
+ MultiXactState->nextOffset += nxids;
+
+ LWLockRelease(MultiXactGenLock);
+
+ debug_elog4(DEBUG2, "GetNew: returning %u offset %u", result, *offset);
+ return result;
+}
+
+/*
+ * GetMultiXactIdMembers
+ * Returns the set of TransactionIds that make up a MultiXactId
+ *
+ * We return -1 if the MultiXactId is too old to possibly have any members
+ * still running; in that case we have not actually looked them up, and
+ * *xids is not set.
+ */
+static int
+GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
+{
+ int pageno;
+ int prev_pageno;
+ int entryno;
+ int slotno;
+ uint32 *offptr;
+ uint32 offset;
+ int length;
+ int i;
+ MultiXactId nextMXact;
+ MultiXactId tmpMXact;
+ uint32 nextOffset;
+ TransactionId *ptr;
+
+ debug_elog3(DEBUG2, "GetMembers: asked for %u", multi);
+
+ Assert(MultiXactIdIsValid(multi));
+
+ /* See if the MultiXactId is in the local cache */
+ length = mXactCacheGetById(multi, xids);
+ if (length >= 0)
+ {
+ debug_elog3(DEBUG2, "GetMembers: found %s in the cache",
+ mxid_to_string(multi, length, *xids));
+ return length;
+ }
+
+ /* Set our OldestVisibleMXactId[] entry if we didn't already */
+ MultiXactIdSetOldestVisible();
+
+ /*
+ * We check known limits on MultiXact before resorting to the SLRU area.
+ *
+ * An ID older than our OldestVisibleMXactId[] entry can't possibly still
+ * be running, and we'd run the risk of trying to read already-truncated
+ * SLRU data if we did try to examine it.
+ *
+ * Conversely, an ID >= nextMXact shouldn't ever be seen here; if it is
+ * seen, it implies undetected ID wraparound has occurred. We just
+ * silently assume that such an ID is no longer running.
+ *
+ * Shared lock is enough here since we aren't modifying any global state.
+ * Also, we can examine our own OldestVisibleMXactId without the lock,
+ * since no one else is allowed to change it.
+ */
+ if (MultiXactIdPrecedes(multi, OldestVisibleMXactId[MyBackendId]))
+ {
+ debug_elog2(DEBUG2, "GetMembers: it's too old");
+ *xids = NULL;
+ return -1;
+ }
+
+ LWLockAcquire(MultiXactGenLock, LW_SHARED);
+
+ if (!MultiXactIdPrecedes(multi, MultiXactState->nextMXact))
+ {
+ LWLockRelease(MultiXactGenLock);
+ debug_elog2(DEBUG2, "GetMembers: it's too new!");
+ *xids = NULL;
+ return -1;
+ }
+
+ /*
+ * Before releasing the lock, save the current counter values, because
+ * the target MultiXactId may be just one less than nextMXact. We will
+ * need to use nextOffset as the endpoint if so.
+ */
+ nextMXact = MultiXactState->nextMXact;
+ nextOffset = MultiXactState->nextOffset;
+
+ LWLockRelease(MultiXactGenLock);
+
+ /* Get the offset at which we need to start reading MultiXactMembers */
+ LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
+
+ pageno = MultiXactIdToOffsetPage(multi);
+ entryno = MultiXactIdToOffsetEntry(multi);
+
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, multi);
+ offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
+ offptr += entryno;
+ offset = *offptr;
+
+ /*
+ * How many members do we need to read? If we are at the end of the
+ * assigned MultiXactIds, use the offset just saved above. Else we
+ * need to check the MultiXactId following ours.
+ *
+ * Use the same increment rule as GetNewMultiXactId(), that is, don't
+ * handle wraparound explicitly until needed.
+ */
+ tmpMXact = multi + 1;
+
+ if (nextMXact == tmpMXact)
+ length = nextOffset - offset;
+ else
+ {
+ /* handle wraparound if needed */
+ if (tmpMXact < FirstMultiXactId)
+ tmpMXact = FirstMultiXactId;
+
+ prev_pageno = pageno;
+
+ pageno = MultiXactIdToOffsetPage(tmpMXact);
+ entryno = MultiXactIdToOffsetEntry(tmpMXact);
+
+ if (pageno != prev_pageno)
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, tmpMXact);
+
+ offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
+ offptr += entryno;
+ length = *offptr - offset;
+ }
+
+ LWLockRelease(MultiXactOffsetControlLock);
+
+ ptr = (TransactionId *) palloc(length * sizeof(TransactionId));
+ *xids = ptr;
+
+ /* Now get the members themselves. */
+ LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
+
+ prev_pageno = -1;
+ for (i = 0; i < length; i++, offset++)
+ {
+ TransactionId *xactptr;
+
+ pageno = MXOffsetToMemberPage(offset);
+ entryno = MXOffsetToMemberEntry(offset);
+
+ if (pageno != prev_pageno)
+ {
+ slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, multi);
+ prev_pageno = pageno;
+ }
+
+ xactptr = (TransactionId *)
+ MultiXactMemberCtl->shared->page_buffer[slotno];
+ xactptr += entryno;
+
+ ptr[i] = *xactptr;
+ }
+
+ LWLockRelease(MultiXactMemberControlLock);
+
+ /*
+ * Copy the result into the local cache.
+ */
+ mXactCachePut(multi, length, ptr);
+
+ debug_elog3(DEBUG2, "GetMembers: no cache for %s",
+ mxid_to_string(multi, length, ptr));
+ return length;
+}
+
+/*
+ * mXactCacheGetBySet
+ * returns a MultiXactId from the cache based on the set of
+ * TransactionIds that compose it, or InvalidMultiXactId if
+ * none matches.
+ *
+ * This is helpful, for example, if two transactions want to lock a huge
+ * table. By using the cache, the second will use the same MultiXactId
+ * for the majority of tuples, thus keeping MultiXactId usage low (saving
+ * both I/O and wraparound issues).
+ *
+ * NB: the passed xids[] array will be sorted in-place.
+ */
+static MultiXactId
+mXactCacheGetBySet(int nxids, TransactionId *xids)
+{
+ mXactCacheEnt *entry;
+
+ debug_elog3(DEBUG2, "CacheGet: looking for %s",
+ mxid_to_string(InvalidMultiXactId, nxids, xids));
+
+ /* sort the array so comparison is easy */
+ qsort(xids, nxids, sizeof(TransactionId), xidComparator);
+
+ for (entry = MXactCache; entry != NULL; entry = entry->next)
+ {
+ if (entry->nxids != nxids)
+ continue;
+
+ /* We assume the cache entries are sorted */
+ if (memcmp(xids, entry->xids, nxids * sizeof(TransactionId)) == 0)
+ {
+ debug_elog3(DEBUG2, "CacheGet: found %u", entry->multi);
+ return entry->multi;
+ }
+ }
+
+ debug_elog2(DEBUG2, "CacheGet: not found :-(");
+ return InvalidMultiXactId;
+}
+
+/*
+ * mXactCacheGetById
+ * returns the composing TransactionId set from the cache for a
+ * given MultiXactId, if present.
+ *
+ * If successful, *xids is set to the address of a palloc'd copy of the
+ * TransactionId set. Return value is number of members, or -1 on failure.
+ */
+static int
+mXactCacheGetById(MultiXactId multi, TransactionId **xids)
+{
+ mXactCacheEnt *entry;
+
+ debug_elog3(DEBUG2, "CacheGet: looking for %u", multi);
+
+ for (entry = MXactCache; entry != NULL; entry = entry->next)
+ {
+ if (entry->multi == multi)
+ {
+ TransactionId *ptr;
+ Size size;
+
+ size = sizeof(TransactionId) * entry->nxids;
+ ptr = (TransactionId *) palloc(size);
+ *xids = ptr;
+
+ memcpy(ptr, entry->xids, size);
+
+ debug_elog3(DEBUG2, "CacheGet: found %s",
+ mxid_to_string(multi, entry->nxids, entry->xids));
+ return entry->nxids;
+ }
+ }
+
+ debug_elog2(DEBUG2, "CacheGet: not found");
+ return -1;
+}
+
+/*
+ * mXactCachePut
+ * Add a new MultiXactId and its composing set into the local cache.
+ */
+static void
+mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids)
+{
+ mXactCacheEnt *entry;
+
+ debug_elog3(DEBUG2, "CachePut: storing %s",
+ mxid_to_string(multi, nxids, xids));
+
+ if (MXactContext == NULL)
+ {
+ /* The cache only lives as long as the current transaction */
+ debug_elog2(DEBUG2, "CachePut: initializing memory context");
+ MXactContext = AllocSetContextCreate(TopTransactionContext,
+ "MultiXact Cache Context",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+ }
+
+ entry = (mXactCacheEnt *)
+ MemoryContextAlloc(MXactContext,
+ offsetof(mXactCacheEnt, xids) +
+ nxids * sizeof(TransactionId));
+
+ entry->multi = multi;
+ entry->nxids = nxids;
+ memcpy(entry->xids, xids, nxids * sizeof(TransactionId));
+
+ /* mXactCacheGetBySet assumes the entries are sorted, so sort them */
+ qsort(entry->xids, nxids, sizeof(TransactionId), xidComparator);
+
+ entry->next = MXactCache;
+ MXactCache = entry;
+}
+
+/*
+ * xidComparator
+ * qsort comparison function for XIDs
+ *
+ * We don't need to use wraparound comparison for XIDs, and indeed must
+ * not do so since that does not respect the triangle inequality! Any
+ * old sort order will do.
+ */
+static int
+xidComparator(const void *arg1, const void *arg2)
+{
+ TransactionId xid1 = * (const TransactionId *) arg1;
+ TransactionId xid2 = * (const TransactionId *) arg2;
+
+ if (xid1 > xid2)
+ return 1;
+ if (xid1 < xid2)
+ return -1;
+ return 0;
+}
+
+#ifdef MULTIXACT_DEBUG
+static char *
+mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids)
+{
+ char *str = palloc(15 * (nxids + 1) + 4);
+ int i;
+ snprintf(str, 47, "%u %d[%u", multi, nxids, xids[0]);
+
+ for (i = 1; i < nxids; i++)
+ snprintf(str + strlen(str), 17, ", %u", xids[i]);
+
+ strcat(str, "]");
+ return str;
+}
+#endif
+
+/*
+ * AtEOXact_MultiXact
+ * Handle transaction end for MultiXact
+ *
+ * This is called at top transaction commit or abort (we don't care which).
+ */
+void
+AtEOXact_MultiXact(void)
+{
+ /*
+ * Reset our OldestMemberMXactId and OldestVisibleMXactId values,
+ * both of which should only be valid while within a transaction.
+ *
+ * We assume that storing a MultiXactId is atomic and so we need
+ * not take MultiXactGenLock to do this.
+ */
+ OldestMemberMXactId[MyBackendId] = InvalidMultiXactId;
+ OldestVisibleMXactId[MyBackendId] = InvalidMultiXactId;
+
+ /*
+ * Discard the local MultiXactId cache. Since MXactContext was created
+ * as a child of TopTransactionContext, we needn't delete it explicitly.
+ */
+ MXactContext = NULL;
+ MXactCache = NULL;
+}
+
+/*
+ * Initialization of shared memory for MultiXact. We use two SLRU areas,
+ * thus double memory. Also, reserve space for the shared MultiXactState
+ * struct and the per-backend MultiXactId arrays (two of those, too).
+ */
+int
+MultiXactShmemSize(void)
+{
+#define SHARED_MULTIXACT_STATE_SIZE \
+ (sizeof(MultiXactStateData) + sizeof(MultiXactId) * 2 * MaxBackends)
+
+ return (SimpleLruShmemSize() * 2 + SHARED_MULTIXACT_STATE_SIZE);
+}
+
+void
+MultiXactShmemInit(void)
+{
+ bool found;
+
+ debug_elog2(DEBUG2, "Shared Memory Init for MultiXact");
+
+ MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes;
+ MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
+
+ SimpleLruInit(MultiXactOffsetCtl, "MultiXactOffset Ctl",
+ MultiXactOffsetControlLock, "pg_multixact/offsets");
+ SimpleLruInit(MultiXactMemberCtl, "MultiXactMember Ctl",
+ MultiXactMemberControlLock, "pg_multixact/members");
+
+ /* Override default assumption that writes should be fsync'd */
+ MultiXactOffsetCtl->do_fsync = false;
+ MultiXactMemberCtl->do_fsync = false;
+
+ /* Initialize our shared state struct */
+ MultiXactState = ShmemInitStruct("Shared MultiXact State",
+ SHARED_MULTIXACT_STATE_SIZE,
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ Assert(!found);
+
+ /* Make sure we zero out the per-backend state */
+ MemSet(MultiXactState, 0, SHARED_MULTIXACT_STATE_SIZE);
+ }
+ else
+ Assert(found);
+
+ /*
+ * Set up array pointers. Note that perBackendXactIds[0] is wasted
+ * space since we only use indexes 1..MaxBackends in each array.
+ */
+ OldestMemberMXactId = MultiXactState->perBackendXactIds;
+ OldestVisibleMXactId = OldestMemberMXactId + MaxBackends;
+}
+
+/*
+ * This func must be called ONCE on system install. It creates the initial
+ * MultiXact segments. (The MultiXacts directories are assumed to have been
+ * created by initdb, and MultiXactShmemInit must have been called already.)
+ *
+ * Note: it's not really necessary to create the initial segments now,
+ * since slru.c would create 'em on first write anyway. But we may as well
+ * do it to be sure the directories are set up correctly.
+ */
+void
+BootStrapMultiXact(void)
+{
+ int slotno;
+
+ LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
+
+ /* Offsets first page */
+ slotno = ZeroMultiXactOffsetPage(0);
+ SimpleLruWritePage(MultiXactOffsetCtl, slotno, NULL);
+ Assert(MultiXactOffsetCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);
+
+ LWLockRelease(MultiXactOffsetControlLock);
+
+ LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
+
+ /* Members first page */
+ slotno = ZeroMultiXactMemberPage(0);
+ SimpleLruWritePage(MultiXactMemberCtl, slotno, NULL);
+ Assert(MultiXactMemberCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);
+
+ LWLockRelease(MultiXactMemberControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of MultiXactOffset to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroMultiXactOffsetPage(int pageno)
+{
+ return SimpleLruZeroPage(MultiXactOffsetCtl, pageno);
+}
+
+/*
+ * Ditto, for MultiXactMember
+ */
+static int
+ZeroMultiXactMemberPage(int pageno)
+{
+ return SimpleLruZeroPage(MultiXactMemberCtl, pageno);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup.
+ *
+ * StartupXLOG has already established nextMXact by calling
+ * MultiXactSetNextMXact and/or MultiXactAdvanceNextMXact.
+ *
+ * We don't need any locks here, really; the SLRU locks are taken
+ * only because slru.c expects to be called with locks held.
+ */
+void
+StartupMultiXact(void)
+{
+ int startPage;
+ int cutoffPage;
+ uint32 offset;
+
+ /*
+ * We start nextOffset at zero after every reboot; there is no need to
+ * avoid offset values that were used in the previous system lifecycle.
+ */
+ MultiXactState->nextOffset = 0;
+
+ /*
+ * Because of the above, a shutdown and restart is likely to leave
+ * high-numbered MultiXactMember page files that would not get recycled
+ * for a long time (about as long as the system had been up in the
+ * previous cycle of life). To clean out such page files, we issue an
+ * artificial truncation call that will zap any page files in the first
+ * half of the offset cycle. Should there be any page files in the last
+ * half, they will get cleaned out by the first checkpoint.
+ *
+ * XXX it might be a good idea to disable this when debugging, since it
+ * will tend to destroy evidence after a crash. To not be *too* ruthless,
+ * we arbitrarily spare the first 64 pages. (Note this will get
+ * rounded off to a multiple of SLRU_PAGES_PER_SEGMENT ...)
+ */
+ offset = ((~ (uint32) 0) >> 1) + 1;
+
+ cutoffPage = MXOffsetToMemberPage(offset) + 64;
+
+ /*
+ * Defeat safety interlock in SimpleLruTruncate; this hack will be
+ * cleaned up by ZeroMultiXactMemberPage call below.
+ */
+ MultiXactMemberCtl->shared->latest_page_number = cutoffPage;
+
+ SimpleLruTruncate(MultiXactMemberCtl, cutoffPage);
+
+ /*
+ * Initialize lastTruncationPoint to invalid, ensuring that the first
+ * checkpoint will try to do truncation.
+ */
+ MultiXactState->lastTruncationPoint = InvalidMultiXactId;
+
+ /*
+ * Since we don't expect MultiXact to be valid across crashes, we
+ * initialize the currently-active pages to zeroes during startup.
+ * Whenever we advance into a new page, both ExtendMultiXact routines
+ * will likewise zero the new page without regard to whatever was
+ * previously on disk.
+ */
+ LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
+
+ startPage = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
+ (void) ZeroMultiXactOffsetPage(startPage);
+
+ LWLockRelease(MultiXactOffsetControlLock);
+
+ LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);
+
+ startPage = MXOffsetToMemberPage(MultiXactState->nextOffset);
+ (void) ZeroMultiXactMemberPage(startPage);
+
+ LWLockRelease(MultiXactMemberControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownMultiXact(void)
+{
+ /*
+ * Flush dirty MultiXact pages to disk
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely as a debugging aid.
+ */
+ SimpleLruFlush(MultiXactOffsetCtl, false);
+ SimpleLruFlush(MultiXactMemberCtl, false);
+}
+
+/*
+ * Get the next MultiXactId to save in a checkpoint record
+ */
+MultiXactId
+MultiXactGetCheckptMulti(bool is_shutdown)
+{
+ MultiXactId retval;
+
+ LWLockAcquire(MultiXactGenLock, LW_SHARED);
+
+ retval = MultiXactState->nextMXact;
+ if (!is_shutdown)
+ retval += MultiXactState->mXactCount;
+
+ LWLockRelease(MultiXactGenLock);
+
+ debug_elog3(DEBUG2, "MultiXact: MultiXact for checkpoint record is %u",
+ retval);
+
+ return retval;
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointMultiXact(void)
+{
+ /*
+ * Flush dirty MultiXact pages to disk
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely to improve the odds that writing of dirty pages is done
+ * by the checkpoint process and not by backends.
+ */
+ SimpleLruFlush(MultiXactOffsetCtl, true);
+ SimpleLruFlush(MultiXactMemberCtl, true);
+
+ /*
+ * Truncate the SLRU files
+ */
+ TruncateMultiXact();
+}
+
+/*
+ * Set the next-to-be-assigned MultiXactId
+ *
+ * This is used when we can determine the correct next Id exactly
+ * from an XLog record. We need no locking since it is only called
+ * during bootstrap and XLog replay.
+ */
+void
+MultiXactSetNextMXact(MultiXactId nextMulti)
+{
+ debug_elog3(DEBUG2, "MultiXact: setting next multi to %u", nextMulti);
+ MultiXactState->nextMXact = nextMulti;
+ MultiXactState->mXactCount = 0;
+}
+
+/*
+ * Ensure the next-to-be-assigned MultiXactId is at least minMulti
+ *
+ * This is used when we can determine a minimum safe value
+ * from an XLog record. We need no locking since it is only called
+ * during XLog replay.
+ */
+void
+MultiXactAdvanceNextMXact(MultiXactId minMulti)
+{
+ if (MultiXactIdPrecedes(MultiXactState->nextMXact, minMulti))
+ {
+ debug_elog3(DEBUG2, "MultiXact: setting next multi to %u", minMulti);
+ MultiXactState->nextMXact = minMulti;
+ MultiXactState->mXactCount = 0;
+ }
+}
+
+/*
+ * Make sure that MultiXactOffset has room for a newly-allocated MultiXactId.
+ *
+ * The MultiXactOffsetControlLock should be held at entry, and will
+ * be held at exit.
+ */
+void
+ExtendMultiXactOffset(MultiXactId multi)
+{
+ int pageno;
+
+ /*
+ * No work except at first MultiXactId of a page. But beware: just after
+ * wraparound, the first MultiXactId of page zero is FirstMultiXactId.
+ */
+ if (MultiXactIdToOffsetEntry(multi) != 0 &&
+ multi != FirstMultiXactId)
+ return;
+
+ pageno = MultiXactIdToOffsetPage(multi);
+
+ /* Zero the page */
+ ZeroMultiXactOffsetPage(pageno);
+}
+
+/*
+ * Make sure that MultiXactMember has room for the members of a newly-
+ * allocated MultiXactId.
+ *
+ * The MultiXactMemberControlLock should be held at entry, and will be held
+ * at exit.
+ */
+void
+ExtendMultiXactMember(uint32 offset)
+{
+ int pageno;
+
+ /*
+ * No work except at first entry of a page.
+ */
+ if (MXOffsetToMemberEntry(offset) != 0)
+ return;
+
+ pageno = MXOffsetToMemberPage(offset);
+
+ /* Zero the page */
+ ZeroMultiXactMemberPage(pageno);
+}
+
+/*
+ * Remove all MultiXactOffset and MultiXactMember segments before the oldest
+ * ones still of interest.
+ *
+ * This is called only during checkpoints. We assume no more than one
+ * backend does this at a time.
+ */
+static void
+TruncateMultiXact(void)
+{
+ MultiXactId nextMXact;
+ uint32 nextOffset;
+ MultiXactId oldestMXact;
+ uint32 oldestOffset;
+ int cutoffPage;
+ int i;
+
+ /*
+ * First, compute where we can safely truncate. Per notes above,
+ * this is the oldest valid value among all the OldestMemberMXactId[] and
+ * OldestVisibleMXactId[] entries, or nextMXact if none are valid.
+ */
+ LWLockAcquire(MultiXactGenLock, LW_SHARED);
+
+ /*
+ * We have to beware of the possibility that nextMXact is in the
+ * wrapped-around state. We don't fix the counter itself here,
+ * but we must be sure to use a valid value in our calculation.
+ */
+ nextMXact = MultiXactState->nextMXact;
+ if (nextMXact < FirstMultiXactId)
+ nextMXact = FirstMultiXactId;
+
+ oldestMXact = nextMXact;
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ MultiXactId thisoldest;
+
+ thisoldest = OldestMemberMXactId[i];
+ if (MultiXactIdIsValid(thisoldest) &&
+ MultiXactIdPrecedes(thisoldest, oldestMXact))
+ oldestMXact = thisoldest;
+ thisoldest = OldestVisibleMXactId[i];
+ if (MultiXactIdIsValid(thisoldest) &&
+ MultiXactIdPrecedes(thisoldest, oldestMXact))
+ oldestMXact = thisoldest;
+ }
+
+ /* Save the current nextOffset too */
+ nextOffset = MultiXactState->nextOffset;
+
+ LWLockRelease(MultiXactGenLock);
+
+ debug_elog3(DEBUG2, "MultiXact: truncation point = %u", oldestMXact);
+
+ /*
+ * If we already truncated at this point, do nothing. This saves time
+ * when no MultiXacts are getting used, which is probably not uncommon.
+ */
+ if (MultiXactState->lastTruncationPoint == oldestMXact)
+ return;
+
+ /*
+ * We need to determine where to truncate MultiXactMember. If we
+ * found a valid oldest MultiXactId, read its starting offset;
+ * otherwise we use the nextOffset value we saved above.
+ */
+ if (oldestMXact == nextMXact)
+ oldestOffset = nextOffset;
+ else
+ {
+ int pageno;
+ int slotno;
+ int entryno;
+ uint32 *offptr;
+
+ LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);
+
+ pageno = MultiXactIdToOffsetPage(oldestMXact);
+ entryno = MultiXactIdToOffsetEntry(oldestMXact);
+
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, oldestMXact);
+ offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
+ offptr += entryno;
+ oldestOffset = *offptr;
+
+ LWLockRelease(MultiXactOffsetControlLock);
+ }
+
+ /*
+ * The cutoff point is the start of the segment containing oldestMXact.
+ * We pass the *page* containing oldestMXact to SimpleLruTruncate.
+ */
+ cutoffPage = MultiXactIdToOffsetPage(oldestMXact);
+
+ SimpleLruTruncate(MultiXactOffsetCtl, cutoffPage);
+
+ /*
+ * Also truncate MultiXactMember at the previously determined offset.
+ */
+ cutoffPage = MXOffsetToMemberPage(oldestOffset);
+
+ SimpleLruTruncate(MultiXactMemberCtl, cutoffPage);
+
+ /*
+ * Set the last known truncation point. We don't need a lock for this
+ * since only one backend does checkpoints at a time.
+ */
+ MultiXactState->lastTruncationPoint = oldestMXact;
+}
+
+/*
+ * Decide which of two MultiXactOffset page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of MultiXactId here in order to do the right
+ * thing with wraparound. However, if we are asked about page number zero, we
+ * don't want to hand InvalidMultiXactId to MultiXactIdPrecedes: it'll get
+ * weird. So, offset both multis by FirstMultiXactId to avoid that.
+ * (Actually, the current implementation doesn't do anything weird with
+ * InvalidMultiXactId, but there's no harm in leaving this code like this.)
+ */
+static bool
+MultiXactOffsetPagePrecedes(int page1, int page2)
+{
+ MultiXactId multi1;
+ MultiXactId multi2;
+
+ multi1 = ((MultiXactId) page1) * MULTIXACT_OFFSETS_PER_PAGE;
+ multi1 += FirstMultiXactId;
+ multi2 = ((MultiXactId) page2) * MULTIXACT_OFFSETS_PER_PAGE;
+ multi2 += FirstMultiXactId;
+
+ return MultiXactIdPrecedes(multi1, multi2);
+}
+
+/*
+ * Decide which of two MultiXactMember page numbers is "older" for truncation
+ * purposes. There is no "invalid offset number" so use the numbers verbatim.
+ */
+static bool
+MultiXactMemberPagePrecedes(int page1, int page2)
+{
+ uint32 offset1;
+ uint32 offset2;
+
+ offset1 = ((uint32) page1) * MULTIXACT_MEMBERS_PER_PAGE;
+ offset2 = ((uint32) page2) * MULTIXACT_MEMBERS_PER_PAGE;
+
+ return MultiXactOffsetPrecedes(offset1, offset2);
+}
+
+/*
+ * Decide which of two MultiXactIds is earlier.
+ *
+ * XXX do we need to do something special for InvalidMultiXactId?
+ * (Doesn't look like it.)
+ */
+static bool
+MultiXactIdPrecedes(MultiXactId multi1, MultiXactId multi2)
+{
+ int32 diff = (int32) (multi1 - multi2);
+
+ return (diff < 0);
+}
+
+/*
+ * Decide which of two offsets is earlier.
+ */
+static bool
+MultiXactOffsetPrecedes(uint32 offset1, uint32 offset2)
+{
+ int32 diff = (int32) (offset1 - offset2);
+
+ return (diff < 0);
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4d896ef551..a318db6134 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.199 2005/04/11 19:51:14 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.200 2005/04/28 21:47:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -20,6 +20,7 @@
#include <time.h>
#include <unistd.h>
+#include "access/multixact.h"
#include "access/subtrans.h"
#include "access/xact.h"
#include "catalog/heap.h"
@@ -1565,6 +1566,8 @@ CommitTransaction(void)
*/
smgrDoPendingDeletes(true);
+ AtEOXact_MultiXact();
+
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
@@ -1710,6 +1713,7 @@ AbortTransaction(void)
AtEOXact_Buffers(false);
AtEOXact_Inval(false);
smgrDoPendingDeletes(false);
+ AtEOXact_MultiXact();
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
false, true);
@@ -3622,9 +3626,9 @@ static void
ShowTransactionState(const char *str)
{
/* skip work if message will definitely not be printed */
- if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
+ if (log_min_messages <= DEBUG3 || client_min_messages <= DEBUG3)
{
- elog(DEBUG2, "%s", str);
+ elog(DEBUG3, "%s", str);
ShowTransactionStateRec(CurrentTransactionState);
}
}
@@ -3640,7 +3644,7 @@ ShowTransactionStateRec(TransactionState s)
ShowTransactionStateRec(s->parent);
/* use ereport to suppress computation if msg will not be printed */
- ereport(DEBUG2,
+ ereport(DEBUG3,
(errmsg_internal("name: %s; blockState: %13s; state: %7s, xid/subid/cid: %u/%u/%u, nestlvl: %d, children: %s",
PointerIsValid(s->name) ? s->name : "unnamed",
BlockStateAsString(s->blockState),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f384bbbe71..48e8b425de 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.188 2005/04/23 18:49:54 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.189 2005/04/28 21:47:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -23,6 +23,7 @@
#include <sys/time.h>
#include "access/clog.h"
+#include "access/multixact.h"
#include "access/subtrans.h"
#include "access/xact.h"
#include "access/xlog.h"
@@ -3534,11 +3535,13 @@ BootStrapXLOG(void)
checkPoint.ThisTimeLineID = ThisTimeLineID;
checkPoint.nextXid = FirstNormalTransactionId;
checkPoint.nextOid = FirstBootstrapObjectId;
+ checkPoint.nextMulti = FirstMultiXactId;
checkPoint.time = time(NULL);
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
+ MultiXactSetNextMXact(checkPoint.nextMulti);
/* Set up the XLOG page header */
page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -3613,6 +3616,7 @@ BootStrapXLOG(void)
/* Bootstrap the commit log, too */
BootStrapCLOG();
BootStrapSUBTRANS();
+ BootStrapMultiXact();
}
static char *
@@ -4187,8 +4191,8 @@ StartupXLOG(void)
checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
wasShutdown ? "TRUE" : "FALSE")));
ereport(LOG,
- (errmsg("next transaction ID: %u; next OID: %u",
- checkPoint.nextXid, checkPoint.nextOid)));
+ (errmsg("next transaction ID: %u; next OID: %u; next MultiXactId: %u",
+ checkPoint.nextXid, checkPoint.nextOid, checkPoint.nextMulti)));
if (!TransactionIdIsNormal(checkPoint.nextXid))
ereport(PANIC,
(errmsg("invalid next transaction ID")));
@@ -4196,6 +4200,7 @@ StartupXLOG(void)
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
+ MultiXactSetNextMXact(checkPoint.nextMulti);
/*
* We must replay WAL entries using the same TimeLineID they were
@@ -4546,9 +4551,10 @@ StartupXLOG(void)
ControlFile->time = time(NULL);
UpdateControlFile();
- /* Start up the commit log, too */
+ /* Start up the commit log and related stuff, too */
StartupCLOG();
StartupSUBTRANS();
+ StartupMultiXact();
ereport(LOG,
(errmsg("database system is ready")));
@@ -4737,6 +4743,7 @@ ShutdownXLOG(int code, Datum arg)
CreateCheckPoint(true, true);
ShutdownCLOG();
ShutdownSUBTRANS();
+ ShutdownMultiXact();
CritSectionCount--;
ereport(LOG,
@@ -4919,6 +4926,8 @@ CreateCheckPoint(bool shutdown, bool force)
checkPoint.nextOid += ShmemVariableCache->oidCount;
LWLockRelease(OidGenLock);
+ checkPoint.nextMulti = MultiXactGetCheckptMulti(shutdown);
+
/*
* Having constructed the checkpoint record, ensure all shmem disk
* buffers and commit-log buffers are flushed to disk.
@@ -4938,6 +4947,7 @@ CreateCheckPoint(bool shutdown, bool force)
CheckPointCLOG();
CheckPointSUBTRANS();
+ CheckPointMultiXact();
FlushBufferPool();
START_CRIT_SECTION();
@@ -5054,6 +5064,33 @@ XLogPutNextOid(Oid nextOid)
rdata.len = sizeof(Oid);
rdata.next = NULL;
(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
+ /*
+ * We need not flush the NEXTOID record immediately, because any of the
+ * just-allocated OIDs could only reach disk as part of a tuple insert
+ * or update that would have its own XLOG record that must follow the
+ * NEXTOID record. Therefore, the standard buffer LSN interlock applied
+ * to those records will ensure no such OID reaches disk before the
+ * NEXTOID record does.
+ */
+}
+
+/*
+ * Write a NEXT_MULTIXACT log record
+ */
+void
+XLogPutNextMultiXactId(MultiXactId nextMulti)
+{
+ XLogRecData rdata;
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) (&nextMulti);
+ rdata.len = sizeof(MultiXactId);
+ rdata.next = NULL;
+ (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTMULTI, &rdata);
+ /*
+ * We do not flush here either; this assumes that heap_lock_tuple() will
+ * always generate a WAL record. See notes therein.
+ */
}
/*
@@ -5075,6 +5112,14 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ShmemVariableCache->oidCount = 0;
}
}
+ else if (info == XLOG_NEXTMULTI)
+ {
+ MultiXactId nextMulti;
+
+ memcpy(&nextMulti, XLogRecGetData(record), sizeof(MultiXactId));
+
+ MultiXactAdvanceNextMXact(nextMulti);
+ }
else if (info == XLOG_CHECKPOINT_SHUTDOWN)
{
CheckPoint checkPoint;
@@ -5084,6 +5129,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
+ MultiXactSetNextMXact(checkPoint.nextMulti);
/*
* TLI may change in a shutdown checkpoint, but it shouldn't
@@ -5115,6 +5161,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
}
+ MultiXactAdvanceNextMXact(checkPoint.nextMulti);
/* TLI should not change in an on-line checkpoint */
if (checkPoint.ThisTimeLineID != ThisTimeLineID)
ereport(PANIC,
@@ -5139,11 +5186,12 @@ xlog_desc(char *buf, uint8 xl_info, char *rec)
CheckPoint *checkpoint = (CheckPoint *) rec;
sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; "
- "tli %u; xid %u; oid %u; %s",
+ "tli %u; xid %u; oid %u; multi %u; %s",
checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
checkpoint->ThisTimeLineID, checkpoint->nextXid,
checkpoint->nextOid,
+ checkpoint->nextMulti,
(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
}
else if (info == XLOG_NEXTOID)
@@ -5153,6 +5201,13 @@ xlog_desc(char *buf, uint8 xl_info, char *rec)
memcpy(&nextOid, rec, sizeof(Oid));
sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
}
+ else if (info == XLOG_NEXTMULTI)
+ {
+ MultiXactId multi;
+
+ memcpy(&multi, rec, sizeof(MultiXactId));
+ sprintf(buf + strlen(buf), "nextMultiXact: %u", multi);
+ }
else
strcat(buf, "UNKNOWN");
}