summaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-02-12 07:41:51 +0530
committerAmit Kapila <akapila@postgresql.org>2021-02-12 07:41:51 +0530
commitce0fdbfe9722867b7fad4d3ede9b6a6bfc51fb4e (patch)
treebe540b24d4cc30cbbd52e92ac164239b6773a699 /src/include/replication
parent3063eb17593c3ad498ce4e89db3358862ea2dbb6 (diff)
downloadpostgresql-ce0fdbfe9722867b7fad4d3ede9b6a6bfc51fb4e.tar.gz
Allow multiple xacts during table sync in logical replication.
For the initial table data synchronization in logical replication, we use a single transaction to copy the entire table and then synchronize the position in the stream with the main apply worker. There are multiple downsides of this approach: (a) We have to perform the entire copy operation again if there is any error (network breakdown, error in the database operation, etc.) while we synchronize the WAL position between tablesync worker and apply worker; this will be onerous especially for large copies, (b) Using a single transaction in the synchronization-phase (where we can receive WAL from multiple transactions) will have the risk of exceeding the CID limit, (c) The slot will hold the WAL till the entire sync is complete because we never commit till the end. This patch solves all the above downsides by allowing multiple transactions during the tablesync phase. The initial copy is done in a single transaction and after that, we commit each transaction as we receive. To allow recovery after any error or crash, we use a permanent slot and origin to track the progress. The slot and origin will be removed once we finish the synchronization of the table. We also remove slot and origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not finished. The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true cannot be executed inside a transaction block because they can now drop the slots for which we have no provision to rollback. This will also open up the path for logical replication of 2PC transactions on the subscriber side. Previously, we can't do that because of the requirement of maintaining a single transaction in tablesync workers. Bump catalog version due to change of state in the catalog (pg_subscription_rel). Author: Peter Smith, Amit Kapila, and Takamichi Osumi Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/logicallauncher.h2
-rw-r--r--src/include/replication/slot.h3
-rw-r--r--src/include/replication/walreceiver.h1
-rw-r--r--src/include/replication/worker_internal.h3
4 files changed, 6 insertions, 3 deletions
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 421ec1580d..301e494f7b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,9 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
-extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
-extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53f636c56f..5f52335f15 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
+#include "replication/walreceiver.h"
/*
* Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4313f516d3..a97a59a6a3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -210,6 +210,7 @@ typedef enum
typedef struct WalRcvExecResult
{
WalRcvExecStatus status;
+ int sqlstate;
char *err;
Tuplestorestate *tuplestore;
TupleDesc tupledesc;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index d046022e49..4a5adc2fda 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -77,13 +77,14 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
+extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);