summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/executor/execAsync.h25
-rw-r--r--src/include/executor/nodeAppend.h2
-rw-r--r--src/include/executor/nodeForeignscan.h4
-rw-r--r--src/include/foreign/fdwapi.h14
-rw-r--r--src/include/nodes/execnodes.h37
-rw-r--r--src/include/nodes/plannodes.h6
-rw-r--r--src/include/optimizer/cost.h1
-rw-r--r--src/include/pgstat.h3
-rw-r--r--src/include/storage/latch.h1
9 files changed, 88 insertions, 5 deletions
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..724034f226
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ * Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncResponse(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+extern void ExecAsyncRequestPending(AsyncRequest *areq);
+
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index cafd410a5d..fa54ac6ad2 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
#endif /* NODEAPPEND_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 6ae7733e25..8ffc0ca5bf 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 248f78da45..7c89d081c7 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -256,6 +264,12 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncRequest_function ForeignAsyncRequest;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+ ForeignAsyncNotify_function ForeignAsyncNotify;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e31ad6204e..09ea7ef6a6 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -516,6 +516,22 @@ typedef struct ResultRelInfo
} ResultRelInfo;
/* ----------------
+ * AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+ struct PlanState *requestor; /* Node that wants a tuple */
+ struct PlanState *requestee; /* Node from which a tuple is wanted */
+ int request_index; /* Scratch space for requestor */
+ bool callback_pending; /* Callback is needed */
+ bool request_complete; /* Request complete, result valid */
+ TupleTableSlot *result; /* Result (NULL if no more tuples) */
+} AsyncRequest;
+
+/* ----------------
* EState information
*
* Working state for an Executor invocation
@@ -1199,12 +1215,12 @@ typedef struct ModifyTableState
* AppendState information
*
* nplans how many plans are in the array
- * whichplan which plan is being executed (0 .. n-1), or a
- * special negative value. See nodeAppend.c.
+ * whichplan which synchronous plan is being executed (0 .. n-1)
+ * or a special negative value. See nodeAppend.c.
* prune_state details required to allow partitions to be
* eliminated from the scan, or NULL if not possible.
- * valid_subplans for runtime pruning, valid appendplans indexes to
- * scan.
+ * valid_subplans for runtime pruning, valid synchronous appendplans
+ * indexes to scan.
* ----------------
*/
@@ -1220,12 +1236,25 @@ struct AppendState
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ bool as_begun; /* false means need to initialize */
+ Bitmapset *as_asyncplans; /* asynchronous plans indexes */
+ int as_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
+ int as_nasyncresults; /* # of valid entries in as_asyncresults */
+ bool as_syncdone; /* true if all synchronous plans done in
+ * asynchronous mode, else false */
+ int as_nasyncremain; /* # of remaining asynchronous plans */
+ Bitmapset *as_needrequest; /* asynchronous plans needing a new request */
+ struct WaitEventSet *as_eventset; /* WaitEventSet used to configure
+ * file descriptor wait events */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
+ Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */
bool (*choose_next_subplan) (AppendState *);
};
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 6e62104d0b..24ca616740 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -130,6 +130,11 @@ typedef struct Plan
bool parallel_safe; /* OK to use as part of parallel plan? */
/*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
+ /*
* Common structural data for all Plan types.
*/
int plan_node_id; /* unique across entire final plan tree */
@@ -245,6 +250,7 @@ typedef struct Append
Plan plan;
Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
List *appendplans;
+ int nasyncplans; /* # of asynchronous plans */
/*
* All 'appendplans' preceding this index are non-partial plans. All
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 1be93be098..a3fd93fe07 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
extern PGDLLIMPORT bool enable_parallel_append;
extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 87672e6f30..d699502cd9 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -966,7 +966,8 @@ typedef enum
*/
typedef enum
{
- WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
+ WAIT_EVENT_APPEND_READY = PG_WAIT_IPC,
+ WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 9e94fcaec2..44f9368c64 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -179,5 +179,6 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
extern void InitializeLatchWaitSet(void);
+extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
#endif /* LATCH_H */