summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/proto.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r--src/backend/replication/logical/proto.c162
1 files changed, 156 insertions, 6 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9ff8097bf5..eb19142b48 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
* Write INSERT to the output stream.
*/
void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
+logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'I'); /* action INSERT */
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream.
*/
void
-logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
- HeapTuple newtuple, bool binary)
+logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
@@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream.
*/
void
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
+logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple oldtuple, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b
pq_sendbyte(out, 'D'); /* action DELETE */
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
*/
void
logicalrep_write_truncate(StringInfo out,
+ TransactionId xid,
int nrelids,
Oid relids[],
bool cascade, bool restart_seqs)
@@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out,
pq_sendbyte(out, 'T'); /* action TRUNCATE */
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
pq_sendint32(out, nrelids);
/* encode and send truncate flags */
@@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in,
* Write relation description to the output stream.
*/
void
-logicalrep_write_rel(StringInfo out, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
{
char *relname;
pq_sendbyte(out, 'R'); /* sending RELATION */
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in)
* This function will always write base type info.
*/
void
-logicalrep_write_typ(StringInfo out, Oid typoid)
+logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
{
Oid basetypoid = getBaseType(typoid);
HeapTuple tup;
@@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid)
pq_sendbyte(out, 'Y'); /* sending TYPE */
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", basetypoid);
@@ -720,3 +747,126 @@ logicalrep_read_namespace(StringInfo in)
return nspname;
}
+
+/*
+ * Write the information for the start stream message to the output stream.
+ */
+void
+logicalrep_write_stream_start(StringInfo out,
+ TransactionId xid, bool first_segment)
+{
+ pq_sendbyte(out, 'S'); /* action STREAM START */
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, xid);
+
+ /* 1 if this is the first streaming segment for this xid */
+ pq_sendbyte(out, first_segment ? 1 : 0);
+}
+
+/*
+ * Read the information about the start stream message from output stream.
+ */
+TransactionId
+logicalrep_read_stream_start(StringInfo in, bool *first_segment)
+{
+ TransactionId xid;
+
+ Assert(first_segment);
+
+ xid = pq_getmsgint(in, 4);
+ *first_segment = (pq_getmsgbyte(in) == 1);
+
+ return xid;
+}
+
+/*
+ * Write the stop stream message to the output stream.
+ */
+void
+logicalrep_write_stream_stop(StringInfo out)
+{
+ pq_sendbyte(out, 'E'); /* action STREAM END */
+}
+
+/*
+ * Write STREAM COMMIT to the output stream.
+ */
+void
+logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
+
+ Assert(TransactionIdIsValid(txn->xid));
+
+ /* transaction ID */
+ pq_sendint32(out, txn->xid);
+
+ /* send the flags field (unused for now) */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read STREAM COMMIT from the output stream.
+ */
+TransactionId
+logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
+{
+ TransactionId xid;
+ uint8 flags;
+
+ xid = pq_getmsgint(in, 4);
+
+ /* read flags (unused for now) */
+ flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit message", flags);
+
+ /* read fields */
+ commit_data->commit_lsn = pq_getmsgint64(in);
+ commit_data->end_lsn = pq_getmsgint64(in);
+ commit_data->committime = pq_getmsgint64(in);
+
+ return xid;
+}
+
+/*
+ * Write STREAM ABORT to the output stream. Note that xid and subxid will be
+ * same for the top-level transaction abort.
+ */
+void
+logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+ TransactionId subxid)
+{
+ pq_sendbyte(out, 'A'); /* action STREAM ABORT */
+
+ Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
+
+ /* transaction ID */
+ pq_sendint32(out, xid);
+ pq_sendint32(out, subxid);
+}
+
+/*
+ * Read STREAM ABORT from the output stream.
+ */
+void
+logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+ TransactionId *subxid)
+{
+ Assert(xid && subxid);
+
+ *xid = pq_getmsgint(in, 4);
+ *subxid = pq_getmsgint(in, 4);
+}