diff options
Diffstat (limited to 'src/include/replication')
| -rw-r--r-- | src/include/replication/logical.h | 5 | ||||
| -rw-r--r-- | src/include/replication/output_plugin.h | 69 | ||||
| -rw-r--r-- | src/include/replication/reorderbuffer.h | 59 |
3 files changed, 133 insertions, 0 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c2f2475e5d..deef31825d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -80,6 +80,11 @@ typedef struct LogicalDecodingContext void *output_writer_private; /* + * Does the output plugin support streaming, and is it enabled? + */ + bool streaming; + + /* * State for writing output. */ bool accept_writes; diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 3dd9236c57..b78c796450 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -100,6 +100,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); /* + * Called when starting to stream a block of changes from in-progress + * transaction (may be called repeatedly, if it's streamed in multiple + * chunks). + */ +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called when stopping to stream a block of changes from in-progress + * transaction to a remote node (may be called repeatedly, if it's streamed + * in multiple chunks). + */ +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called to discard changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* + * Called to apply changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Callback for streaming individual changes from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* + * Callback for streaming generic logical decoding messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* + * Callback for streaming truncates from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* * Output plugin callbacks */ typedef struct OutputPluginCallbacks @@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + /* streaming of changes */ + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; /* Functions in replication/logical/logical.c */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1055e99e2e..42bc817648 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -348,6 +348,54 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* start streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStartCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr first_lsn); + +/* stop streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStopCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr last_lsn); + +/* discard streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* commit streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* stream change callback signature */ +typedef void (*ReorderBufferStreamChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* stream message callback signature */ +typedef void (*ReorderBufferStreamMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + +/* stream truncate callback signature */ +typedef void (*ReorderBufferStreamTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + struct ReorderBuffer { /* @@ -387,6 +435,17 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callbacks to be called when streaming a transaction. + */ + ReorderBufferStreamStartCB stream_start; + ReorderBufferStreamStopCB stream_stop; + ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamCommitCB stream_commit; + ReorderBufferStreamChangeCB stream_change; + ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamTruncateCB stream_truncate; + + /* * Pointer that will be passed untouched to the callbacks. */ void *private_data; |
