summaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/twophase.c59
-rw-r--r--src/backend/access/transam/xact.c475
-rw-r--r--src/backend/access/transam/xlog.c126
3 files changed, 376 insertions, 284 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6edc22704c..4075a6f743 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2079,7 +2079,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
START_CRIT_SECTION();
@@ -2088,36 +2087,11 @@ RecordTransactionCommitPrepared(TransactionId xid,
MyPgXact->delayChkpt = true;
/* Emit the XLOG commit record */
- xlrec.xid = xid;
-
- xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
- xlrec.crec.dbId = MyDatabaseId;
- xlrec.crec.tsId = MyDatabaseTableSpace;
-
- xlrec.crec.xact_time = GetCurrentTimestamp();
- xlrec.crec.nrels = nrels;
- xlrec.crec.nsubxacts = nchildren;
- xlrec.crec.nmsgs = ninvalmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- /* dump cache invalidation messages */
- if (ninvalmsgs > 0)
- XLogRegisterData((char *) invalmsgs,
- ninvalmsgs * sizeof(SharedInvalidationMessage));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+ recptr = XactLogCommitRecord(GetCurrentTimestamp(),
+ nchildren, children, nrels, rels,
+ ninvalmsgs, invalmsgs,
+ initfileinval, false,
+ xid);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2160,7 +2134,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
- xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
/*
@@ -2174,24 +2147,10 @@ RecordTransactionAbortPrepared(TransactionId xid,
START_CRIT_SECTION();
/* Emit the XLOG abort record */
- xlrec.xid = xid;
- xlrec.arec.xact_time = GetCurrentTimestamp();
- xlrec.arec.nrels = nrels;
- xlrec.arec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+ recptr = XactLogAbortRecord(GetCurrentTimestamp(),
+ nchildren, children,
+ nrels, rels,
+ xid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 89769eac07..1495bb499f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1068,70 +1068,11 @@ RecordTransactionCommit(void)
SetCurrentTransactionStopTimestamp();
- /*
- * Do we need the long commit record? If not, use the compact format.
- *
- * For now always use the non-compact version if wal_level=logical, so
- * we can hide commits from other databases. TODO: In the future we
- * should merge compact and non-compact commits and use a flags
- * variable to determine if it contains subxacts, relations or
- * invalidation messages, that's more extensible and degrades more
- * gracefully. Till then, it's just 20 bytes of overhead.
- */
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
- XLogLogicalInfoActive())
- {
- xl_xact_commit xlrec;
-
- /*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- xlrec.nmsgs = nmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels,
- nrels * sizeof(RelFileNode));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
- /* dump shared cache invalidation messages */
- if (nmsgs > 0)
- XLogRegisterData((char *) invalMessages,
- nmsgs * sizeof(SharedInvalidationMessage));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
- }
- else
- {
- xl_xact_commit_compact xlrec;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
- }
+ XactLogCommitRecord(xactStopTimestamp,
+ nchildren, children, nrels, rels,
+ nmsgs, invalMessages,
+ RelcacheInitFileInval, forceSyncCommit,
+ InvalidTransactionId /* plain commit */);
}
/*
@@ -1424,7 +1365,7 @@ RecordTransactionAbort(bool isSubXact)
RelFileNode *rels;
int nchildren;
TransactionId *children;
- xl_xact_abort xlrec;
+ TimestampTz xact_time;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1464,28 +1405,17 @@ RecordTransactionAbort(bool isSubXact)
/* Write the ABORT record */
if (isSubXact)
- xlrec.xact_time = GetCurrentTimestamp();
+ xact_time = GetCurrentTimestamp();
else
{
SetCurrentTransactionStopTimestamp();
- xlrec.xact_time = xactStopTimestamp;
+ xact_time = xactStopTimestamp;
}
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+ XactLogAbortRecord(xact_time,
+ nchildren, children,
+ nrels, rels,
+ InvalidTransactionId);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -4659,23 +4589,229 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
+
+/*
+ * Log the commit record for a plain or twophase transaction commit.
+ *
+ * A 2pc commit will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync,
+ TransactionId twophase_xid)
+{
+ xl_xact_commit xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_invals xl_invals;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc commit */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_COMMIT;
+ else
+ info = XLOG_XACT_COMMIT_PREPARED;
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = commit_time;
+
+ if (relcacheInval)
+ xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+ if (forceSyncCommit)
+ xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+ /*
+ * Relcache invalidations requires information about the current database
+ * and so does logical decoding.
+ */
+ if (nmsgs > 0 || XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (nmsgs > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
+ xl_invals.nmsgs = nmsgs;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the commit record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
+/*
+ * Log the commit record for a plain or twophase transaction abort.
+ *
+ * A 2pc abort will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ TransactionId twophase_xid)
+{
+ xl_xact_abort xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc abort */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_ABORT;
+ else
+ info = XLOG_XACT_ABORT_PREPARED;
+
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = abort_time;
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the abort record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
/*
* Before 9.0 this was a fairly short function, but now it performs many
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TimestampTz commit_time,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit(xl_xact_parsed_commit *parsed,
+ TransactionId xid,
+ XLogRecPtr lsn)
{
TransactionId max_xid;
int i;
- max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+ max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4694,15 +4830,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Set the transaction commit timestamp and metadata */
- TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
- commit_time, InvalidCommitTsNodeId, false);
+ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
+ parsed->xact_time, InvalidCommitTsNodeId,
+ false);
if (standbyState == STANDBY_DISABLED)
{
/*
* Mark the transaction committed in pg_clog.
*/
- TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+ TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
@@ -4726,21 +4863,24 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
- TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+ TransactionIdAsyncCommitTree(
+ xid, parsed->nsubxacts, parsed->subxacts, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
- ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
- XactCompletionRelcacheInitFileInval(xinfo),
- dbId, tsId);
+ ProcessCommittedInvalidationMessages(
+ parsed->msgs, parsed->nmsgs,
+ XactCompletionRelcacheInitFileInval(parsed->xinfo),
+ parsed->dbId, parsed->tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
@@ -4753,7 +4893,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
}
/* Make sure files supposed to be dropped are dropped */
- if (nrels > 0)
+ if (parsed->nrels > 0)
{
/*
* First update minimum recovery point to cover this WAL record. Once
@@ -4772,13 +4912,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
*/
XLogFlush(lsn);
- for (i = 0; i < nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4796,52 +4936,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* minRecoveryPoint during recovery) helps to reduce that problem window,
* for any user that requested ForceSyncCommit().
*/
- if (XactCompletionForceSyncCommit(xinfo))
+ if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn);
}
/*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- TransactionId *subxacts;
- SharedInvalidationMessage *inval_msgs;
-
- /* subxid array follows relfilenodes */
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- /* invalidation messages array follows subxids */
- inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- subxacts, xlrec->nsubxacts,
- inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
- xlrec->dbId,
- xlrec->tsId,
- xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- xlrec->subxacts, xlrec->nsubxacts,
- NULL, 0, /* inval msgs */
- NULL, 0, /* relfilenodes */
- InvalidOid, /* dbId */
- InvalidOid, /* tsId */
- 0); /* xinfo */
-}
-
-/*
* Be careful with the order of execution, as with xact_redo_commit().
* The two functions are similar but differ in key places.
*
@@ -4851,14 +4951,10 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
{
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+ int i;
+ TransactionId max_xid;
/*
* Make sure nextXid is beyond any XID mentioned in the record.
@@ -4867,6 +4963,10 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
* hold a lock while checking this. We still acquire the lock to modify
* it, though.
*/
+ max_xid = TransactionIdLatest(xid,
+ parsed->nsubxacts,
+ parsed->subxacts);
+
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
@@ -4879,7 +4979,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
if (standbyState == STANDBY_DISABLED)
{
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
@@ -4895,12 +4995,13 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
RecordKnownAssignedTransactionIds(max_xid);
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
/*
* We must update the ProcArray after we have marked clog.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* There are no flat files that need updating, nor invalidation
@@ -4910,17 +5011,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
/*
* Release locks, if any. There are no invalidations to send.
*/
- StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+ StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
}
/* Make sure files supposed to be dropped are dropped */
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xlrec->xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
@@ -4929,28 +5030,52 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
void
xact_redo(XLogReaderState *record)
{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
- xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+ if (info == XLOG_XACT_COMMIT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr);
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, parsed.twophase_xid,
+ record->EndRecPtr);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_abort(xlrec, XLogRecGetXid(record));
+ if (info == XLOG_XACT_ABORT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, XLogRecGetXid(record));
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, parsed.twophase_xid);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
else if (info == XLOG_XACT_PREPARE)
{
@@ -4958,20 +5083,6 @@ xact_redo(XLogReaderState *record)
RecreateTwoPhaseFile(XLogRecGetXid(record),
XLogRecGetData(record), XLogRecGetDataLen(record));
}
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
-
- xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
-
- xact_redo_abort(&xlrec->arec, xlrec->xid);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 72af2c4330..e2d187f74d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5168,39 +5168,27 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
static bool
getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
- uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 xact_info = info & XLOG_XACT_OPMASK;
uint8 rmid = XLogRecGetRmid(record);
- if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
- {
- *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED))
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
- {
- *recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED))
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
- {
- *recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
- return true;
- }
return false;
}
@@ -5216,7 +5204,7 @@ static bool
recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
- uint8 record_info;
+ uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
@@ -5237,27 +5225,40 @@ recoveryStopsBefore(XLogReaderState *record)
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_COMMIT_PREPARED)
+ else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
isCommit = true;
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
- else if (record_info == XLOG_XACT_ABORT)
+ else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
- isCommit = false;
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ isCommit = true;
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
else
return false;
@@ -5325,11 +5326,12 @@ recoveryStopsBefore(XLogReaderState *record)
static bool
recoveryStopsAfter(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 info;
+ uint8 xact_info;
uint8 rmid;
TimestampTz recordXtime;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
rmid = XLogRecGetRmid(record);
/*
@@ -5337,7 +5339,7 @@ recoveryStopsAfter(XLogReaderState *record)
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
- rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
@@ -5358,12 +5360,15 @@ recoveryStopsAfter(XLogReaderState *record)
}
}
- if (rmid == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED ||
- record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED))
+ if (rmid != RM_XACT_ID)
+ return false;
+
+ xact_info = info & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED ||
+ xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
TransactionId recordXid;
@@ -5372,10 +5377,26 @@ recoveryStopsAfter(XLogReaderState *record)
SetLatestXTime(recordXtime);
/* Extract the XID of the committed/aborted transaction */
- if (record_info == XLOG_XACT_COMMIT_PREPARED)
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ if (xact_info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
else
recordXid = XLogRecGetXid(record);
@@ -5396,17 +5417,16 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
- if (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
- else if (record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
@@ -5494,7 +5514,7 @@ SetRecoveryPause(bool recoveryPause)
static bool
recoveryApplyDelay(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 xact_info;
TimestampTz xtime;
long secs;
int microsecs;
@@ -5511,11 +5531,13 @@ recoveryApplyDelay(XLogReaderState *record)
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)))
+ if (XLogRecGetRmid(record) != RM_XACT_ID)
+ return false;
+
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_COMMIT;
+
+ if (xact_info != XLOG_XACT_COMMIT &&
+ xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
if (!getRecordTimestamp(record, &xtime))