summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/execParallel.c26
-rw-r--r--src/backend/executor/nodeCustom.c45
-rw-r--r--src/backend/executor/nodeForeignscan.c62
-rw-r--r--src/include/executor/nodeCustom.h11
-rw-r--r--src/include/executor/nodeForeignscan.h8
-rw-r--r--src/include/foreign/fdwapi.h14
-rw-r--r--src/include/nodes/execnodes.h14
7 files changed, 179 insertions, 1 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 29e450a571..95e8e41d2b 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,8 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
@@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanEstimate((ForeignScanState *) planstate,
+ e->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanEstimate((CustomScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+ d->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+ d->pcxt);
+ break;
default:
break;
}
@@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
+ case T_ForeignScanState:
+ ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+ toc);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+ toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 640289e277..322abca282 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -10,6 +10,7 @@
*/
#include "postgres.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "nodes/execnodes.h"
@@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
node->methods->CustomName)));
node->methods->RestrPosCustomScan(node);
}
+
+void
+ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->EstimateDSMCustomScan)
+ {
+ node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+void
+ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeDSMCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ methods->InitializeDSMCustomScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+void
+ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->InitializeWorkerCustomScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ methods->InitializeWorkerCustomScan(node, toc, coordinate);
+ }
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 64a07bcc77..388c922749 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
ExecScanReScan(&node->ss);
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanEstimate
+ *
+ * Informs size of the parallel coordination information, if any
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->EstimateDSMForeignScan)
+ {
+ node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialize the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeDSMForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
+ shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanInitializeDSM
+ *
+ * Initialization according to the parallel coordination information
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->InitializeWorkerForeignScan)
+ {
+ int plan_node_id = node->ss.ps.plan->plan_node_id;
+ void *coordinate;
+
+ coordinate = shm_toc_lookup(toc, plan_node_id);
+ fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
+ }
+}
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index e244942d79..410a3ad14d 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -12,6 +12,7 @@
#ifndef NODECUSTOM_H
#define NODECUSTOM_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
/*
@@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node);
extern void ExecCustomMarkPos(CustomScanState *node);
extern void ExecCustomRestrPos(CustomScanState *node);
+/*
+ * Parallel execution support
+ */
+extern void ExecCustomScanEstimate(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeDSM(CustomScanState *node,
+ ParallelContext *pcxt);
+extern void ExecCustomScanInitializeWorker(CustomScanState *node,
+ shm_toc *toc);
+
#endif /* NODECUSTOM_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index a92ce5c22a..c2553295fa 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -14,6 +14,7 @@
#ifndef NODEFOREIGNSCAN_H
#define NODEFOREIGNSCAN_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
@@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node);
extern void ExecEndForeignScan(ForeignScanState *node);
extern void ExecReScanForeignScan(ForeignScanState *node);
+extern void ExecForeignScanEstimate(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
+ ParallelContext *pcxt);
+extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
+ shm_toc *toc);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index db73233f65..e16fbf34ec 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -12,6 +12,7 @@
#ifndef FDWAPI_H
#define FDWAPI_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
@@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
Oid serverOid);
+typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt);
+typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate);
+typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -177,6 +186,11 @@ typedef struct FdwRoutine
/* Support functions for IMPORT FOREIGN SCHEMA */
ImportForeignSchema_function ImportForeignSchema;
+
+ /* Support functions for parallelism under Gather node */
+ EstimateDSMForeignScan_function EstimateDSMForeignScan;
+ InitializeDSMForeignScan_function InitializeDSMForeignScan;
+ InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20ac50..064a0509c4 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1585,6 +1585,7 @@ typedef struct ForeignScanState
{
ScanState ss; /* its first field is NodeTag */
List *fdw_recheck_quals; /* original quals not in ss.ps.qual */
+ Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
void *fdw_state; /* foreign-data wrapper can keep state here */
@@ -1603,6 +1604,8 @@ typedef struct ForeignScanState
* the BeginCustomScan method.
* ----------------
*/
+struct ParallelContext; /* avoid including parallel.h here */
+struct shm_toc; /* avoid including shm_toc.h here */
struct ExplainState; /* avoid including explain.h here */
struct CustomScanState;
@@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods
void (*ReScanCustomScan) (struct CustomScanState *node);
void (*MarkPosCustomScan) (struct CustomScanState *node);
void (*RestrPosCustomScan) (struct CustomScanState *node);
-
+ /* Optional: parallel execution support */
+ Size (*EstimateDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt);
+ void (*InitializeDSMCustomScan) (struct CustomScanState *node,
+ struct ParallelContext *pcxt,
+ void *coordinate);
+ void (*InitializeWorkerCustomScan) (struct CustomScanState *node,
+ struct shm_toc *toc,
+ void *coordinate);
/* Optional: print additional information in EXPLAIN */
void (*ExplainCustomScan) (struct CustomScanState *node,
List *ancestors,
@@ -1631,6 +1642,7 @@ typedef struct CustomScanState
ScanState ss;
uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */
List *custom_ps; /* list of child PlanState nodes, if any */
+ Size pscan_len; /* size of parallel coordination information */
const CustomExecMethods *methods;
} CustomScanState;