summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-04-30 15:02:14 -0400
committerRobert Haas <rhaas@postgresql.org>2015-04-30 15:02:14 -0400
commit924bcf4f16d54c55310b28f77686608684734f42 (patch)
tree79f35bf679c6a68dbe725cc90248a553f2b9e019 /src/include
parent669c7d20e6374850593cb430d332e11a3992bbcf (diff)
downloadpostgresql-924bcf4f16d54c55310b28f77686608684734f42.tar.gz
Create an infrastructure for parallel computation in PostgreSQL.
This does four basic things. First, it provides convenience routines to coordinate the startup and shutdown of parallel workers. Second, it synchronizes various pieces of state (e.g. GUCs, combo CID mappings, transaction snapshot) from the parallel group leader to the worker processes. Third, it prohibits various operations that would result in unsafe changes to that state while parallelism is active. Finally, it propagates events that would result in an ErrorResponse, NoticeResponse, or NotifyResponse message being sent to the client from the parallel workers back to the master, from which they can then be sent on to the client. Robert Haas, Amit Kapila, Noah Misch, Rushabh Lathia, Jeevan Chalke. Suggestions and review from Andres Freund, Heikki Linnakangas, Noah Misch, Simon Riggs, Euler Taveira, and Jim Nasby.
Diffstat (limited to 'src/include')
-rw-r--r--src/include/access/parallel.h68
-rw-r--r--src/include/access/xact.h11
-rw-r--r--src/include/catalog/namespace.h2
-rw-r--r--src/include/fmgr.h3
-rw-r--r--src/include/libpq/pqmq.h1
-rw-r--r--src/include/miscadmin.h1
-rw-r--r--src/include/postmaster/bgworker.h2
-rw-r--r--src/include/storage/procarray.h1
-rw-r--r--src/include/storage/procsignal.h1
-rw-r--r--src/include/utils/combocid.h3
-rw-r--r--src/include/utils/snapmgr.h5
11 files changed, 97 insertions, 1 deletions
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
new file mode 100644
index 0000000000..8274f841b6
--- /dev/null
+++ b/src/include/access/parallel.h
@@ -0,0 +1,68 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.h
+ * Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef PARALLEL_H
+#define PARALLEL_H
+
+#include "access/xlogdefs.h"
+#include "lib/ilist.h"
+#include "postmaster/bgworker.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "utils/elog.h"
+
+typedef void (*parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc);
+
+typedef struct ParallelWorkerInfo
+{
+ BackgroundWorkerHandle *bgwhandle;
+ shm_mq_handle *error_mqh;
+ int32 pid;
+} ParallelWorkerInfo;
+
+typedef struct ParallelContext
+{
+ dlist_node node;
+ SubTransactionId subid;
+ int nworkers;
+ parallel_worker_main_type entrypoint;
+ char *library_name;
+ char *function_name;
+ ErrorContextCallback *error_context_stack;
+ shm_toc_estimator estimator;
+ dsm_segment *seg;
+ void *private;
+ shm_toc *toc;
+ ParallelWorkerInfo *worker;
+} ParallelContext;
+
+extern bool ParallelMessagePending;
+extern int ParallelWorkerNumber;
+
+#define IsParallelWorker() (ParallelWorkerNumber >= 0)
+
+extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
+extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
+extern void InitializeParallelDSM(ParallelContext *);
+extern void LaunchParallelWorkers(ParallelContext *);
+extern void WaitForParallelWorkersToFinish(ParallelContext *);
+extern void DestroyParallelContext(ParallelContext *);
+extern bool ParallelContextActive(void);
+
+extern void HandleParallelMessageInterrupt(void);
+extern void HandleParallelMessages(void);
+extern void AtEOXact_Parallel(bool isCommit);
+extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
+extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
+
+#endif /* PARALLEL_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cad1bb1d31..ed808fc150 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -78,9 +78,12 @@ extern bool MyXactAccessedTempRel;
typedef enum
{
XACT_EVENT_COMMIT,
+ XACT_EVENT_PARALLEL_COMMIT,
XACT_EVENT_ABORT,
+ XACT_EVENT_PARALLEL_ABORT,
XACT_EVENT_PREPARE,
XACT_EVENT_PRE_COMMIT,
+ XACT_EVENT_PARALLEL_PRE_COMMIT,
XACT_EVENT_PRE_PREPARE
} XactEvent;
@@ -332,6 +335,10 @@ extern void BeginInternalSubTransaction(char *name);
extern void ReleaseCurrentSubTransaction(void);
extern void RollbackAndReleaseCurrentSubTransaction(void);
extern bool IsSubTransaction(void);
+extern Size EstimateTransactionStateSpace(void);
+extern void SerializeTransactionState(Size maxsize, char *start_address);
+extern void StartParallelWorkerTransaction(char *tstatespace);
+extern void EndParallelWorkerTransaction(void);
extern bool IsTransactionBlock(void);
extern bool IsTransactionOrTransactionBlock(void);
extern char TransactionBlockStatusCode(void);
@@ -368,4 +375,8 @@ extern const char *xact_identify(uint8 info);
extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+extern void EnterParallelMode(void);
+extern void ExitParallelMode(void);
+extern bool IsInParallelMode(void);
+
#endif /* XACT_H */
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index cf5f7d0a78..f3b005fa9d 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -140,7 +140,7 @@ extern Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding);
/* initialization & transaction cleanup code */
extern void InitializeSearchPath(void);
-extern void AtEOXact_Namespace(bool isCommit);
+extern void AtEOXact_Namespace(bool isCommit, bool parallel);
extern void AtEOSubXact_Namespace(bool isCommit, SubTransactionId mySubid,
SubTransactionId parentSubid);
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 418f6aadaa..b9a5c40f59 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -642,6 +642,9 @@ extern PGFunction load_external_function(char *filename, char *funcname,
extern PGFunction lookup_external_function(void *filehandle, char *funcname);
extern void load_file(const char *filename, bool restricted);
extern void **find_rendezvous_variable(const char *varName);
+extern Size EstimateLibraryStateSpace(void);
+extern void SerializeLibraryState(Size maxsize, char *start_address);
+extern void RestoreLibraryState(char *start_address);
/*
* Support for aggregate functions
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index 5f2815ca90..ad7589d4ed 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
#include "storage/shm_mq.h"
extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index eacfccbcba..c389939738 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -271,6 +271,7 @@ extern void check_stack_depth(void);
/* in tcop/utility.c */
extern void PreventCommandIfReadOnly(const char *cmdname);
+extern void PreventCommandIfParallelMode(const char *cmdname);
extern void PreventCommandDuringRecovery(const char *cmdname);
/* in utils/misc/guc.c */
diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h
index a81b90badc..de9180df91 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -112,6 +112,8 @@ extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle,
extern BgwHandleStatus
WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *
handle, pid_t *pid);
+extern BgwHandleStatus
+WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *);
/* Terminate a bgworker */
extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 97c6e9344e..a9b40ed944 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot);
extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
TransactionId sourcexid);
+extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
extern RunningTransactions GetRunningTransactionData(void);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index ac9d236dec..af1a0cd71f 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -31,6 +31,7 @@ typedef enum
{
PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */
PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */
+ PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_DATABASE,
diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h
index ce7b47c24e..f2faa12623 100644
--- a/src/include/utils/combocid.h
+++ b/src/include/utils/combocid.h
@@ -21,5 +21,8 @@
*/
extern void AtEOXact_ComboCid(void);
+extern void RestoreComboCIDState(char *comboCIDstate);
+extern void SerializeComboCIDState(Size maxsize, char *start_address);
+extern Size EstimateComboCIDStateSpace(void);
#endif /* COMBOCID_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 64d2ec1e5e..f8524eb687 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -64,4 +64,9 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids)
extern void TeardownHistoricSnapshot(bool is_error);
extern bool HistoricSnapshotActive(void);
+extern Size EstimateSnapshotSpace(Snapshot snapshot);
+extern void SerializeSnapshot(Snapshot snapshot, char *start_address);
+extern Snapshot RestoreSnapshot(char *start_address);
+extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc);
+
#endif /* SNAPMGR_H */