diff options
Diffstat (limited to 'src/include')
| -rw-r--r-- | src/include/executor/execAsync.h | 25 | ||||
| -rw-r--r-- | src/include/executor/nodeAppend.h | 2 | ||||
| -rw-r--r-- | src/include/executor/nodeForeignscan.h | 4 | ||||
| -rw-r--r-- | src/include/foreign/fdwapi.h | 14 | ||||
| -rw-r--r-- | src/include/nodes/execnodes.h | 37 | ||||
| -rw-r--r-- | src/include/nodes/plannodes.h | 6 | ||||
| -rw-r--r-- | src/include/optimizer/cost.h | 1 | ||||
| -rw-r--r-- | src/include/pgstat.h | 3 | ||||
| -rw-r--r-- | src/include/storage/latch.h | 1 |
9 files changed, 88 insertions, 5 deletions
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h new file mode 100644 index 0000000000..724034f226 --- /dev/null +++ b/src/include/executor/execAsync.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * execAsync.h + * Support functions for asynchronous execution + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAsync.h + *------------------------------------------------------------------------- + */ + +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" + +extern void ExecAsyncRequest(AsyncRequest *areq); +extern void ExecAsyncConfigureWait(AsyncRequest *areq); +extern void ExecAsyncNotify(AsyncRequest *areq); +extern void ExecAsyncResponse(AsyncRequest *areq); +extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result); +extern void ExecAsyncRequestPending(AsyncRequest *areq); + +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index cafd410a5d..fa54ac6ad2 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt); extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt); +extern void ExecAsyncAppendResponse(AsyncRequest *areq); + #endif /* NODEAPPEND_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 6ae7733e25..8ffc0ca5bf 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern void ExecAsyncForeignScanRequest(AsyncRequest *areq); +extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq); +extern void ExecAsyncForeignScanNotify(AsyncRequest *areq); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 248f78da45..7c89d081c7 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); + +typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -256,6 +264,12 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ForeignAsyncRequest_function ForeignAsyncRequest; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ForeignAsyncNotify_function ForeignAsyncNotify; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e31ad6204e..09ea7ef6a6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -516,6 +516,22 @@ typedef struct ResultRelInfo } ResultRelInfo; /* ---------------- + * AsyncRequest + * + * State for an asynchronous tuple request. + * ---------------- + */ +typedef struct AsyncRequest +{ + struct PlanState *requestor; /* Node that wants a tuple */ + struct PlanState *requestee; /* Node from which a tuple is wanted */ + int request_index; /* Scratch space for requestor */ + bool callback_pending; /* Callback is needed */ + bool request_complete; /* Request complete, result valid */ + TupleTableSlot *result; /* Result (NULL if no more tuples) */ +} AsyncRequest; + +/* ---------------- * EState information * * Working state for an Executor invocation @@ -1199,12 +1215,12 @@ typedef struct ModifyTableState * AppendState information * * nplans how many plans are in the array - * whichplan which plan is being executed (0 .. n-1), or a - * special negative value. See nodeAppend.c. + * whichplan which synchronous plan is being executed (0 .. n-1) + * or a special negative value. See nodeAppend.c. * prune_state details required to allow partitions to be * eliminated from the scan, or NULL if not possible. - * valid_subplans for runtime pruning, valid appendplans indexes to - * scan. + * valid_subplans for runtime pruning, valid synchronous appendplans + * indexes to scan. * ---------------- */ @@ -1220,12 +1236,25 @@ struct AppendState PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + bool as_begun; /* false means need to initialize */ + Bitmapset *as_asyncplans; /* asynchronous plans indexes */ + int as_nasyncplans; /* # of asynchronous plans */ + AsyncRequest **as_asyncrequests; /* array of AsyncRequests */ + TupleTableSlot **as_asyncresults; /* unreturned results of async plans */ + int as_nasyncresults; /* # of valid entries in as_asyncresults */ + bool as_syncdone; /* true if all synchronous plans done in + * asynchronous mode, else false */ + int as_nasyncremain; /* # of remaining asynchronous plans */ + Bitmapset *as_needrequest; /* asynchronous plans needing a new request */ + struct WaitEventSet *as_eventset; /* WaitEventSet used to configure + * file descriptor wait events */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ ParallelAppendState *as_pstate; /* parallel coordination info */ Size pstate_len; /* size of parallel coordination info */ struct PartitionPruneState *as_prune_state; Bitmapset *as_valid_subplans; + Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */ bool (*choose_next_subplan) (AppendState *); }; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 6e62104d0b..24ca616740 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -130,6 +130,11 @@ typedef struct Plan bool parallel_safe; /* OK to use as part of parallel plan? */ /* + * information needed for asynchronous execution + */ + bool async_capable; /* engage asynchronous-capable logic? */ + + /* * Common structural data for all Plan types. */ int plan_node_id; /* unique across entire final plan tree */ @@ -245,6 +250,7 @@ typedef struct Append Plan plan; Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ List *appendplans; + int nasyncplans; /* # of asynchronous plans */ /* * All 'appendplans' preceding this index are non-partial plans. All diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 1be93be098..a3fd93fe07 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate; extern PGDLLIMPORT bool enable_parallel_append; extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; +extern PGDLLIMPORT bool enable_async_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 87672e6f30..d699502cd9 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -966,7 +966,8 @@ typedef enum */ typedef enum { - WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC, + WAIT_EVENT_APPEND_READY = PG_WAIT_IPC, + WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WAIT_EVENT_BTREE_PAGE, diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 9e94fcaec2..44f9368c64 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -179,5 +179,6 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); extern void InitializeLatchWaitSet(void); +extern int GetNumRegisteredWaitEvents(WaitEventSet *set); #endif /* LATCH_H */ |
