summaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/logical.h5
-rw-r--r--src/include/replication/output_plugin.h69
-rw-r--r--src/include/replication/reorderbuffer.h59
3 files changed, 133 insertions, 0 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2f2475e5d..deef31825d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -80,6 +80,11 @@ typedef struct LogicalDecodingContext
void *output_writer_private;
/*
+ * Does the output plugin support streaming, and is it enabled?
+ */
+ bool streaming;
+
+ /*
* State for writing output.
*/
bool accept_writes;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 3dd9236c57..b78c796450 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -100,6 +100,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
+ * Called when starting to stream a block of changes from in-progress
+ * transaction (may be called repeatedly, if it's streamed in multiple
+ * chunks).
+ */
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called when stopping to stream a block of changes from in-progress
+ * transaction to a remote node (may be called repeatedly, if it's streamed
+ * in multiple chunks).
+ */
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called to discard changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/*
+ * Called to apply changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Callback for streaming individual changes from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for streaming generic logical decoding messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Callback for streaming truncates from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
@@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+ /* streaming of changes */
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
/* Functions in replication/logical/logical.c */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1055e99e2e..42bc817648 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* start streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStartCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+
+/* stop streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStopCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+
+/* discard streamed transaction callback signature */
+typedef void (*ReorderBufferStreamAbortCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/* commit streamed transaction callback signature */
+typedef void (*ReorderBufferStreamCommitCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* stream change callback signature */
+typedef void (*ReorderBufferStreamChangeCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/* stream message callback signature */
+typedef void (*ReorderBufferStreamMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
+/* stream truncate callback signature */
+typedef void (*ReorderBufferStreamTruncateCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
struct ReorderBuffer
{
/*
@@ -387,6 +435,17 @@ struct ReorderBuffer
ReorderBufferMessageCB message;
/*
+ * Callbacks to be called when streaming a transaction.
+ */
+ ReorderBufferStreamStartCB stream_start;
+ ReorderBufferStreamStopCB stream_stop;
+ ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamCommitCB stream_commit;
+ ReorderBufferStreamChangeCB stream_change;
+ ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamTruncateCB stream_truncate;
+
+ /*
* Pointer that will be passed untouched to the callbacks.
*/
void *private_data;