summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c952
1 files changed, 947 insertions, 5 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b576e342cb..812aca8011 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -18,11 +18,45 @@
* This module includes server facing code and shares libpqwalreceiver
* module with walreceiver for providing the libpq specific functionality.
*
+ *
+ * STREAMED TRANSACTIONS
+ * ---------------------
+ * Streamed transactions (large transactions exceeding a memory limit on the
+ * upstream) are not applied immediately, but instead, the data is written
+ * to temporary files and then applied at once when the final commit arrives.
+ *
+ * Unlike the regular (non-streamed) case, handling streamed transactions has
+ * to handle aborts of both the toplevel transaction and subtransactions. This
+ * is achieved by tracking offsets for subtransactions, which is then used
+ * to truncate the file with serialized changes.
+ *
+ * The files are placed in tmp file directory by default, and the filenames
+ * include both the XID of the toplevel transaction and OID of the
+ * subscription. This is necessary so that different workers processing a
+ * remote transaction with the same XID doesn't interfere.
+ *
+ * We use BufFiles instead of using normal temporary files because (a) the
+ * BufFile infrastructure supports temporary files that exceed the OS file size
+ * limit, (b) provides a way for automatic clean up on the error and (c) provides
+ * a way to survive these files across local transactions and allow to open and
+ * close at stream start and close. We decided to use SharedFileSet
+ * infrastructure as without that it deletes the files on the closure of the
+ * file and if we decide to keep stream files open across the start/stop stream
+ * then it will consume a lot of memory (more than 8K for each BufFile and
+ * there could be multiple such BufFiles as the subscriber could receive
+ * multiple start/stop streams for different transactions before getting the
+ * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * a new way to pass filenames to BufFile APIs so that we are allowed to open
+ * the file we desired across multiple stream-open calls for the same
+ * transaction.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <sys/stat.h>
+#include <unistd.h>
+
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
@@ -33,7 +67,9 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
+#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
@@ -63,7 +99,9 @@
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/buffile.h"
#include "storage/bufmgr.h"
+#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
@@ -71,6 +109,7 @@
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
+#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
@@ -99,9 +138,26 @@ typedef struct SlotErrCallbackArg
int remote_attnum;
} SlotErrCallbackArg;
+/*
+ * Stream xid hash entry. Whenever we see a new xid we create this entry in the
+ * xidhash and along with it create the streaming file and store the fileset handle.
+ * The subxact file is created iff there is any subxact info under this xid. This
+ * entry is used on the subsequent streams for the xid to get the corresponding
+ * fileset handles, so storing them in hash makes the search faster.
+ */
+typedef struct StreamXidHash
+{
+ TransactionId xid; /* xid is the hash key and must be first */
+ SharedFileSet *stream_fileset; /* shared file set for stream data */
+ SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+} StreamXidHash;
+
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
+/* per stream context for streaming transactions */
+static MemoryContext LogicalStreamingContext = NULL;
+
WalReceiverConn *wrconn = NULL;
Subscription *MySubscription = NULL;
@@ -110,12 +166,66 @@ bool MySubscriptionValid = false;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+/* fields valid only when processing streamed transaction */
+bool in_streamed_transaction = false;
+
+static TransactionId stream_xid = InvalidTransactionId;
+
+/*
+ * Hash table for storing the streaming xid information along with shared file
+ * set for streaming and subxact files.
+ */
+static HTAB *xidhash = NULL;
+
+/* BufFile handle of the current streaming file */
+static BufFile *stream_fd = NULL;
+
+typedef struct SubXactInfo
+{
+ TransactionId xid; /* XID of the subxact */
+ int fileno; /* file number in the buffile */
+ off_t offset; /* offset in the file */
+} SubXactInfo;
+
+/* Sub-transaction data for the current streaming transaction */
+typedef struct ApplySubXactData
+{
+ uint32 nsubxacts; /* number of sub-transactions */
+ uint32 nsubxacts_max; /* current capacity of subxacts */
+ TransactionId subxact_last; /* xid of the last sub-transaction */
+ SubXactInfo *subxacts; /* sub-xact offset in changes file */
+} ApplySubXactData;
+
+static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
+
+static void subxact_filename(char *path, Oid subid, TransactionId xid);
+static void changes_filename(char *path, Oid subid, TransactionId xid);
+
+/*
+ * Information about subtransactions of a given toplevel transaction.
+ */
+static void subxact_info_write(Oid subid, TransactionId xid);
+static void subxact_info_read(Oid subid, TransactionId xid);
+static void subxact_info_add(TransactionId xid);
+static inline void cleanup_subxact_info(void);
+
+/*
+ * Serialize and deserialize changes for a toplevel transaction.
+ */
+static void stream_cleanup_files(Oid subid, TransactionId xid);
+static void stream_open_file(Oid subid, TransactionId xid, bool first);
+static void stream_write_change(char action, StringInfo s);
+static void stream_close_file(void);
+
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void);
+/* prototype needed because of stream_commit */
+static void apply_dispatch(StringInfo s);
+
static void apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ResultRelInfo *relinfo,
@@ -187,6 +297,42 @@ ensure_transaction(void)
return true;
}
+/*
+ * Handle streamed transactions.
+ *
+ * If in streaming mode (receiving a block of streamed transaction), we
+ * simply redirect it to a file for the proper toplevel transaction.
+ *
+ * Returns true for streamed transactions, false otherwise (regular mode).
+ */
+static bool
+handle_streamed_transaction(const char action, StringInfo s)
+{
+ TransactionId xid;
+
+ /* not in streaming mode */
+ if (!in_streamed_transaction)
+ return false;
+
+ Assert(stream_fd != NULL);
+ Assert(TransactionIdIsValid(stream_xid));
+
+ /*
+ * We should have received XID of the subxact as the first part of the
+ * message, so extract it.
+ */
+ xid = pq_getmsgint(s, 4);
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* Add the new subxact to the array (unless already there). */
+ subxact_info_add(xid);
+
+ /* write the change to the current file */
+ stream_write_change(action, s);
+
+ return true;
+}
/*
* Executor state preparation for evaluation of constraint expressions,
@@ -612,17 +758,336 @@ static void
apply_handle_origin(StringInfo s)
{
/*
- * ORIGIN message can only come inside remote transaction and before any
- * actual writes.
+ * ORIGIN message can only come inside streaming transaction or inside
+ * remote transaction and before any actual writes.
*/
- if (!in_remote_transaction ||
- (IsTransactionState() && !am_tablesync_worker()))
+ if (!in_streamed_transaction &&
+ (!in_remote_transaction ||
+ (IsTransactionState() && !am_tablesync_worker())))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("ORIGIN message sent out of order")));
}
/*
+ * Handle STREAM START message.
+ */
+static void
+apply_handle_stream_start(StringInfo s)
+{
+ bool first_segment;
+ HASHCTL hash_ctl;
+
+ Assert(!in_streamed_transaction);
+
+ /*
+ * Start a transaction on stream start, this transaction will be committed
+ * on the stream stop. We need the transaction for handling the buffile,
+ * used for serializing the streaming data and subxact info.
+ */
+ ensure_transaction();
+
+ /* notify handle methods we're processing a remote transaction */
+ in_streamed_transaction = true;
+
+ /* extract XID of the top-level transaction */
+ stream_xid = logicalrep_read_stream_start(s, &first_segment);
+
+ /*
+ * Initialize the xidhash table if we haven't yet. This will be used for
+ * the entire duration of the apply worker so create it in permanent
+ * context.
+ */
+ if (xidhash == NULL)
+ {
+ hash_ctl.keysize = sizeof(TransactionId);
+ hash_ctl.entrysize = sizeof(StreamXidHash);
+ hash_ctl.hcxt = ApplyContext;
+ xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
+ HASH_ELEM | HASH_CONTEXT);
+ }
+
+ /* open the spool file for this transaction */
+ stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+
+ /* if this is not the first segment, open existing subxact file */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle STREAM STOP message.
+ */
+static void
+apply_handle_stream_stop(StringInfo s)
+{
+ Assert(in_streamed_transaction);
+
+ /*
+ * Close the file with serialized changes, and serialize information about
+ * subxacts for the toplevel transaction.
+ */
+ subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
+ stream_close_file();
+
+ /* We must be in a valid transaction state */
+ Assert(IsTransactionState());
+
+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();
+
+ in_streamed_transaction = false;
+
+ /* Reset per-stream context */
+ MemoryContextReset(LogicalStreamingContext);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle STREAM abort message.
+ */
+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+
+ Assert(!in_streamed_transaction);
+
+ logicalrep_read_stream_abort(s, &xid, &subxid);
+
+ /*
+ * If the two XIDs are the same, it's in fact abort of toplevel xact, so
+ * just delete the files with serialized info.
+ */
+ if (xid == subxid)
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ else
+ {
+ /*
+ * OK, so it's a subxact. We need to read the subxact file for the
+ * toplevel transaction, determine the offset tracked for the subxact,
+ * and truncate the file with changes. We also remove the subxacts
+ * with higher offsets (or rather higher XIDs).
+ *
+ * We intentionally scan the array from the tail, because we're likely
+ * aborting a change for the most recent subtransactions.
+ *
+ * We can't use the binary search here as subxact XIDs won't
+ * necessarily arrive in sorted order, consider the case where we have
+ * released the savepoint for multiple subtransactions and then
+ * performed rollback to savepoint for one of the earlier
+ * sub-transaction.
+ */
+
+ int64 i;
+ int64 subidx;
+ BufFile *fd;
+ bool found = false;
+ char path[MAXPGPATH];
+ StreamXidHash *ent;
+
+ subidx = -1;
+ ensure_transaction();
+ subxact_info_read(MyLogicalRepWorker->subid, xid);
+
+ for (i = subxact_data.nsubxacts; i > 0; i--)
+ {
+ if (subxact_data.subxacts[i - 1].xid == subxid)
+ {
+ subidx = (i - 1);
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * If it's an empty sub-transaction then we will not find the subxid
+ * here so just cleanup the subxact info and return.
+ */
+ if (!found)
+ {
+ /* Cleanup the subxact info */
+ cleanup_subxact_info();
+ CommitTransactionCommand();
+ return;
+ }
+
+ Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
+
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+ Assert(found);
+
+ /* open the changes file */
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+ fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+
+ /* OK, truncate the file at the right offset */
+ BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
+ subxact_data.subxacts[subidx].offset);
+ BufFileClose(fd);
+
+ /* discard the subxacts added later */
+ subxact_data.nsubxacts = subidx;
+
+ /* write the updated subxact list */
+ subxact_info_write(MyLogicalRepWorker->subid, xid);
+ CommitTransactionCommand();
+ }
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ TransactionId xid;
+ StringInfoData s2;
+ int nchanges;
+ char path[MAXPGPATH];
+ char *buffer = NULL;
+ bool found;
+ LogicalRepCommitData commit_data;
+ StreamXidHash *ent;
+ MemoryContext oldcxt;
+ BufFile *fd;
+
+ Assert(!in_streamed_transaction);
+
+ xid = logicalrep_read_stream_commit(s, &commit_data);
+
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ ensure_transaction();
+
+ /*
+ * Allocate file handle and memory required to process all the messages in
+ * TopTransactionContext to avoid them getting reset after each message is
+ * processed.
+ */
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* open the spool file for the committed transaction */
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+ elog(DEBUG1, "replaying changes from file \"%s\"", path);
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+ Assert(found);
+ fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+
+ buffer = palloc(BLCKSZ);
+ initStringInfo(&s2);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ remote_final_lsn = commit_data.commit_lsn;
+
+ /*
+ * Make sure the handle apply_dispatch methods are aware we're in a remote
+ * transaction.
+ */
+ in_remote_transaction = true;
+ pgstat_report_activity(STATE_RUNNING, NULL);
+
+ /*
+ * Read the entries one by one and pass them through the same logic as in
+ * apply_dispatch.
+ */
+ nchanges = 0;
+ while (true)
+ {
+ int nbytes;
+ int len;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* read length of the on-disk record */
+ nbytes = BufFileRead(fd, &len, sizeof(len));
+
+ /* have we reached end of the file? */
+ if (nbytes == 0)
+ break;
+
+ /* do we have a correct length? */
+ if (nbytes != sizeof(len))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's changes file \"%s\": %m",
+ path)));
+
+ Assert(len > 0);
+
+ /* make sure we have sufficiently large buffer */
+ buffer = repalloc(buffer, len);
+
+ /* and finally read the data into the buffer */
+ if (BufFileRead(fd, buffer, len) != len)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's changes file \"%s\": %m",
+ path)));
+
+ /* copy the buffer to the stringinfo and call apply_dispatch */
+ resetStringInfo(&s2);
+ appendBinaryStringInfo(&s2, buffer, len);
+
+ /* Ensure we are reading the data into our memory context. */
+ oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
+
+ apply_dispatch(&s2);
+
+ MemoryContextReset(ApplyMessageContext);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ nchanges++;
+
+ if (nchanges % 1000 == 0)
+ elog(DEBUG1, "replayed %d changes from file '%s'",
+ nchanges, path);
+ }
+
+ BufFileClose(fd);
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data.end_lsn;
+ replorigin_session_origin_timestamp = commit_data.committime;
+
+ pfree(buffer);
+ pfree(s2.data);
+
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data.end_lsn);
+
+ elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
+ nchanges, path);
+
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data.end_lsn);
+
+ /* unlink the files with serialized changes and subxact info */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
* Handle RELATION message.
*
* Note we don't do validation against local schema here. The validation
@@ -635,6 +1100,9 @@ apply_handle_relation(StringInfo s)
{
LogicalRepRelation *rel;
+ if (handle_streamed_transaction('R', s))
+ return;
+
rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);
}
@@ -650,6 +1118,9 @@ apply_handle_type(StringInfo s)
{
LogicalRepTyp typ;
+ if (handle_streamed_transaction('Y', s))
+ return;
+
logicalrep_read_typ(s, &typ);
logicalrep_typmap_update(&typ);
}
@@ -686,6 +1157,9 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ if (handle_streamed_transaction('I', s))
+ return;
+
ensure_transaction();
relid = logicalrep_read_insert(s, &newtup);
@@ -801,6 +1275,9 @@ apply_handle_update(StringInfo s)
RangeTblEntry *target_rte;
MemoryContext oldctx;
+ if (handle_streamed_transaction('U', s))
+ return;
+
ensure_transaction();
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
@@ -950,6 +1427,9 @@ apply_handle_delete(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ if (handle_streamed_transaction('D', s))
+ return;
+
ensure_transaction();
relid = logicalrep_read_delete(s, &oldtup);
@@ -1320,6 +1800,9 @@ apply_handle_truncate(StringInfo s)
List *relids_logged = NIL;
ListCell *lc;
+ if (handle_streamed_transaction('T', s))
+ return;
+
ensure_transaction();
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
@@ -1458,6 +1941,22 @@ apply_dispatch(StringInfo s)
case 'O':
apply_handle_origin(s);
break;
+ /* STREAM START */
+ case 'S':
+ apply_handle_stream_start(s);
+ break;
+ /* STREAM END */
+ case 'E':
+ apply_handle_stream_stop(s);
+ break;
+ /* STREAM ABORT */
+ case 'A':
+ apply_handle_stream_abort(s);
+ break;
+ /* STREAM COMMIT */
+ case 'c':
+ apply_handle_stream_commit(s);
+ break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1570,6 +2069,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES);
+ /*
+ * This memory context is used for per-stream data when the streaming mode
+ * is enabled. This context is reset on each stream stop.
+ */
+ LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
+ "LogicalStreamingContext",
+ ALLOCSET_DEFAULT_SIZES);
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1674,7 +2181,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */
send_feedback(last_received, false, false);
- if (!in_remote_transaction)
+ if (!in_remote_transaction && !in_streamed_transaction)
{
/*
* If we didn't get any transactions for a while there might be
@@ -1938,6 +2445,7 @@ maybe_reread_subscription(void)
strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
+ newsub->stream != MySubscription->stream ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
@@ -1979,6 +2487,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false;
}
+/*
+ * subxact_info_write
+ * Store information about subxacts for a toplevel transaction.
+ *
+ * For each subxact we store offset of it's first change in the main file.
+ * The file is always over-written as a whole.
+ *
+ * XXX We should only store subxacts that were not aborted yet.
+ */
+static void
+subxact_info_write(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ bool found;
+ Size len;
+ StreamXidHash *ent;
+ BufFile *fd;
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+ /* we must found the entry for its top transaction by this time */
+ Assert(found);
+
+ /*
+ * If there is no subtransaction then nothing to do, but if already have
+ * subxact file then delete that.
+ */
+ if (subxact_data.nsubxacts == 0)
+ {
+ if (ent->subxact_fileset)
+ {
+ cleanup_subxact_info();
+ SharedFileSetDeleteAll(ent->subxact_fileset);
+ pfree(ent->subxact_fileset);
+ ent->subxact_fileset = NULL;
+ }
+ return;
+ }
+
+ subxact_filename(path, subid, xid);
+
+ /*
+ * Create the subxact file if it not already created, otherwise open the
+ * existing file.
+ */
+ if (ent->subxact_fileset == NULL)
+ {
+ MemoryContext oldctx;
+
+ /*
+ * We need to maintain shared fileset across multiple stream
+ * start/stop calls. So, need to allocate it in a persistent context.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ ent->subxact_fileset = palloc(sizeof(SharedFileSet));
+ SharedFileSetInit(ent->subxact_fileset, NULL);
+ MemoryContextSwitchTo(oldctx);
+
+ fd = BufFileCreateShared(ent->subxact_fileset, path);
+ }
+ else
+ fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+
+ len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+ /* Write the subxact count and subxact info */
+ BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
+ BufFileWrite(fd, subxact_data.subxacts, len);
+
+ BufFileClose(fd);
+
+ /* free the memory allocated for subxact info */
+ cleanup_subxact_info();
+}
+
+/*
+ * subxact_info_read
+ * Restore information about subxacts of a streamed transaction.
+ *
+ * Read information about subxacts into the structure subxact_data that can be
+ * used later.
+ */
+static void
+subxact_info_read(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ bool found;
+ Size len;
+ BufFile *fd;
+ StreamXidHash *ent;
+ MemoryContext oldctx;
+
+ Assert(TransactionIdIsValid(xid));
+ Assert(!subxact_data.subxacts);
+ Assert(subxact_data.nsubxacts == 0);
+ Assert(subxact_data.nsubxacts_max == 0);
+
+ /* Find the stream xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+
+ /*
+ * If subxact_fileset is not valid that mean we don't have any subxact
+ * info
+ */
+ if (ent->subxact_fileset == NULL)
+ return;
+
+ subxact_filename(path, subid, xid);
+
+ fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+
+ /* read number of subxact items */
+ if (BufFileRead(fd, &subxact_data.nsubxacts,
+ sizeof(subxact_data.nsubxacts)) !=
+ sizeof(subxact_data.nsubxacts))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
+ path)));
+
+ len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+ /* we keep the maximum as a power of 2 */
+ subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
+
+ /*
+ * Allocate subxact information in the logical streaming context. We need
+ * this information during the complete stream so that we can add the sub
+ * transaction info to this. On stream stop we will flush this information
+ * to the subxact file and reset the logical streaming context.
+ */
+ oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
+ subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
+ sizeof(SubXactInfo));
+ MemoryContextSwitchTo(oldctx);
+
+ if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
+ path)));
+
+ BufFileClose(fd);
+}
+
+/*
+ * subxact_info_add
+ * Add information about a subxact (offset in the main file).
+ */
+static void
+subxact_info_add(TransactionId xid)
+{
+ SubXactInfo *subxacts = subxact_data.subxacts;
+ int64 i;
+
+ /* We must have a valid top level stream xid and a stream fd. */
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ /*
+ * If the XID matches the toplevel transaction, we don't want to add it.
+ */
+ if (stream_xid == xid)
+ return;
+
+ /*
+ * In most cases we're checking the same subxact as we've already seen in
+ * the last call, so make sure to ignore it (this change comes later).
+ */
+ if (subxact_data.subxact_last == xid)
+ return;
+
+ /* OK, remember we're processing this XID. */
+ subxact_data.subxact_last = xid;
+
+ /*
+ * Check if the transaction is already present in the array of subxact. We
+ * intentionally scan the array from the tail, because we're likely adding
+ * a change for the most recent subtransactions.
+ *
+ * XXX Can we rely on the subxact XIDs arriving in sorted order? That
+ * would allow us to use binary search here.
+ */
+ for (i = subxact_data.nsubxacts; i > 0; i--)
+ {
+ /* found, so we're done */
+ if (subxacts[i - 1].xid == xid)
+ return;
+ }
+
+ /* This is a new subxact, so we need to add it to the array. */
+ if (subxact_data.nsubxacts == 0)
+ {
+ MemoryContext oldctx;
+
+ subxact_data.nsubxacts_max = 128;
+
+ /*
+ * Allocate this memory for subxacts in per-stream context, see
+ * subxact_info_read.
+ */
+ oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
+ subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+ MemoryContextSwitchTo(oldctx);
+ }
+ else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
+ {
+ subxact_data.nsubxacts_max *= 2;
+ subxacts = repalloc(subxacts,
+ subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+ }
+
+ subxacts[subxact_data.nsubxacts].xid = xid;
+
+ /*
+ * Get the current offset of the stream file and store it as offset of
+ * this subxact.
+ */
+ BufFileTell(stream_fd,
+ &subxacts[subxact_data.nsubxacts].fileno,
+ &subxacts[subxact_data.nsubxacts].offset);
+
+ subxact_data.nsubxacts++;
+ subxact_data.subxacts = subxacts;
+}
+
+/* format filename for file containing the info about subxacts */
+static void
+subxact_filename(char *path, Oid subid, TransactionId xid)
+{
+ snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
+}
+
+/* format filename for file containing serialized changes */
+static inline void
+changes_filename(char *path, Oid subid, TransactionId xid)
+{
+ snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
+}
+
+/*
+ * stream_cleanup_files
+ * Cleanup files for a subscription / toplevel transaction.
+ *
+ * Remove files with serialized changes and subxact info for a particular
+ * toplevel transaction. Each subscription has a separate set of files.
+ */
+static void
+stream_cleanup_files(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ StreamXidHash *ent;
+
+ /* Remove the xid entry from the stream xid hash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_REMOVE,
+ NULL);
+ /* By this time we must have created the transaction entry */
+ Assert(ent != NULL);
+
+ /* Delete the change file and release the stream fileset memory */
+ changes_filename(path, subid, xid);
+ SharedFileSetDeleteAll(ent->stream_fileset);
+ pfree(ent->stream_fileset);
+ ent->stream_fileset = NULL;
+
+ /* Delete the subxact file and release the memory, if it exist */
+ if (ent->subxact_fileset)
+ {
+ subxact_filename(path, subid, xid);
+ SharedFileSetDeleteAll(ent->subxact_fileset);
+ pfree(ent->subxact_fileset);
+ ent->subxact_fileset = NULL;
+ }
+}
+
+/*
+ * stream_open_file
+ * Open a file that we'll use to serialize changes for a toplevel
+ * transaction.
+ *
+ * Open a file for streamed changes from a toplevel transaction identified
+ * by stream_xid (global variable). If it's the first chunk of streamed
+ * changes for this transaction, initialize the shared fileset and create the
+ * buffile, otherwise open the previously created file.
+ *
+ * This can only be called at the beginning of a "streaming" block, i.e.
+ * between stream_start/stream_stop messages from the upstream.
+ */
+static void
+stream_open_file(Oid subid, TransactionId xid, bool first_segment)
+{
+ char path[MAXPGPATH];
+ bool found;
+ MemoryContext oldcxt;
+ StreamXidHash *ent;
+
+ Assert(in_streamed_transaction);
+ Assert(OidIsValid(subid));
+ Assert(TransactionIdIsValid(xid));
+ Assert(stream_fd == NULL);
+
+ /* create or find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_ENTER | HASH_FIND,
+ &found);
+ Assert(first_segment || found);
+ changes_filename(path, subid, xid);
+ elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
+
+ /*
+ * Create/open the buffiles under the logical streaming context so that we
+ * have those files until stream stop.
+ */
+ oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
+
+ /*
+ * If this is the first streamed segment, the file must not exist, so make
+ * sure we're the ones creating it. Otherwise just open the file for
+ * writing, in append mode.
+ */
+ if (first_segment)
+ {
+ MemoryContext savectx;
+ SharedFileSet *fileset;
+
+ /*
+ * We need to maintain shared fileset across multiple stream
+ * start/stop calls. So, need to allocate it in a persistent context.
+ */
+ savectx = MemoryContextSwitchTo(ApplyContext);
+ fileset = palloc(sizeof(SharedFileSet));
+
+ SharedFileSetInit(fileset, NULL);
+ MemoryContextSwitchTo(savectx);
+
+ stream_fd = BufFileCreateShared(fileset, path);
+
+ /* Remember the fileset for the next stream of the same transaction */
+ ent->xid = xid;
+ ent->stream_fileset = fileset;
+ ent->subxact_fileset = NULL;
+ }
+ else
+ {
+ /*
+ * Open the file and seek to the end of the file because we always
+ * append the changes file.
+ */
+ stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ BufFileSeek(stream_fd, 0, 0, SEEK_END);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * stream_close_file
+ * Close the currently open file with streamed changes.
+ *
+ * This can only be called at the end of a streaming block, i.e. at stream_stop
+ * message from the upstream.
+ */
+static void
+stream_close_file(void)
+{
+ Assert(in_streamed_transaction);
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ BufFileClose(stream_fd);
+
+ stream_xid = InvalidTransactionId;
+ stream_fd = NULL;
+}
+
+/*
+ * stream_write_change
+ * Serialize a change to a file for the current toplevel transaction.
+ *
+ * The change is serialized in a simple format, with length (not including
+ * the length), action code (identifying the message type) and message
+ * contents (without the subxact TransactionId value).
+ */
+static void
+stream_write_change(char action, StringInfo s)
+{
+ int len;
+
+ Assert(in_streamed_transaction);
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ /* total on-disk size, including the action type character */
+ len = (s->len - s->cursor) + sizeof(char);
+
+ /* first write the size */
+ BufFileWrite(stream_fd, &len, sizeof(len));
+
+ /* then the action */
+ BufFileWrite(stream_fd, &action, sizeof(action));
+
+ /* and finally the remaining part of the buffer (after the XID) */
+ len = (s->len - s->cursor);
+
+ BufFileWrite(stream_fd, &s->data[s->cursor], len);
+}
+
+/*
+ * Cleanup the memory for subxacts and reset the related variables.
+ */
+static inline void
+cleanup_subxact_info()
+{
+ if (subxact_data.subxacts)
+ pfree(subxact_data.subxacts);
+
+ subxact_data.subxacts = NULL;
+ subxact_data.subxact_last = InvalidTransactionId;
+ subxact_data.nsubxacts = 0;
+ subxact_data.nsubxacts_max = 0;
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -2151,6 +3092,7 @@ ApplyWorkerMain(Datum main_arg)
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
+ options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
walrcv_startstreaming(wrconn, &options);