diff options
Diffstat (limited to 'src/backend/replication/logical/proto.c')
| -rw-r--r-- | src/backend/replication/logical/proto.c | 162 |
1 files changed, 156 insertions, 6 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9ff8097bf5..eb19142b48 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary) +logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, + HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'I'); /* action INSERT */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) * Write UPDATE to the output stream. */ void -logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary) +logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, HeapTuple newtuple, bool binary) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary) +logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b pq_sendbyte(out, 'D'); /* action DELETE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) */ void logicalrep_write_truncate(StringInfo out, + TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs) @@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out, pq_sendbyte(out, 'T'); /* action TRUNCATE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + pq_sendint32(out, nrelids); /* encode and send truncate flags */ @@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; pq_sendbyte(out, 'R'); /* sending RELATION */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in) * This function will always write base type info. */ void -logicalrep_write_typ(StringInfo out, Oid typoid) +logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) { Oid basetypoid = getBaseType(typoid); HeapTuple tup; @@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid) pq_sendbyte(out, 'Y'); /* sending TYPE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", basetypoid); @@ -720,3 +747,126 @@ logicalrep_read_namespace(StringInfo in) return nspname; } + +/* + * Write the information for the start stream message to the output stream. + */ +void +logicalrep_write_stream_start(StringInfo out, + TransactionId xid, bool first_segment) +{ + pq_sendbyte(out, 'S'); /* action STREAM START */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + + /* 1 if this is the first streaming segment for this xid */ + pq_sendbyte(out, first_segment ? 1 : 0); +} + +/* + * Read the information about the start stream message from output stream. + */ +TransactionId +logicalrep_read_stream_start(StringInfo in, bool *first_segment) +{ + TransactionId xid; + + Assert(first_segment); + + xid = pq_getmsgint(in, 4); + *first_segment = (pq_getmsgbyte(in) == 1); + + return xid; +} + +/* + * Write the stop stream message to the output stream. + */ +void +logicalrep_write_stream_stop(StringInfo out) +{ + pq_sendbyte(out, 'E'); /* action STREAM END */ +} + +/* + * Write STREAM COMMIT to the output stream. + */ +void +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + + Assert(TransactionIdIsValid(txn->xid)); + + /* transaction ID */ + pq_sendint32(out, txn->xid); + + /* send the flags field (unused for now) */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +/* + * Read STREAM COMMIT from the output stream. + */ +TransactionId +logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags (unused for now) */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit message", flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + return xid; +} + +/* + * Write STREAM ABORT to the output stream. Note that xid and subxid will be + * same for the top-level transaction abort. + */ +void +logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid) +{ + pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + + Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); + + /* transaction ID */ + pq_sendint32(out, xid); + pq_sendint32(out, subxid); +} + +/* + * Read STREAM ABORT from the output stream. + */ +void +logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid) +{ + Assert(xid && subxid); + + *xid = pq_getmsgint(in, 4); + *subxid = pq_getmsgint(in, 4); +} |
