diff options
Diffstat (limited to 'src/bin')
| -rw-r--r-- | src/bin/initdb/initdb.c | 1 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/pg_receivexlog.c | 5 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 49 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 1 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/streamutil.h | 1 |
5 files changed, 51 insertions, 6 deletions
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 6b5302f6fd..a71320d945 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -195,6 +195,7 @@ const char *subdirs[] = { "pg_multixact/offsets", "base", "base/1", + "pg_replslot", "pg_tblspc", "pg_stat", "pg_stat_tmp" diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 3c6ab9a902..8a702e3388 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -67,6 +67,7 @@ usage(void) printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_(" --slot replication slot to use\n")); printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n")); } @@ -343,6 +344,7 @@ main(int argc, char **argv) {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, + {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} }; @@ -409,6 +411,9 @@ main(int argc, char **argv) exit(1); } break; + case 'S': + replication_slot = pg_strdup(optarg); + break; case 'n': noloop = 1; break; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 2555904cd0..7d3c76c994 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -31,6 +31,8 @@ /* fd and filename for currently open WAL file */ static int walfile = -1; static char current_walfile_name[MAXPGPATH] = ""; +static bool reportFlushPosition = false; +static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, @@ -133,7 +135,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *partial_suffix) +close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) { off_t currpos; @@ -187,6 +189,7 @@ close_walfile(char *basedir, char *partial_suffix) _("%s: not renaming \"%s%s\", segment is not complete\n"), progname, current_walfile_name, partial_suffix); + lastFlushPosition = pos; return true; } @@ -421,7 +424,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) len += 1; sendint64(blockpos, &replybuf[len]); /* write */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + if (reportFlushPosition) + sendint64(lastFlushPosition, &replybuf[len]); /* flush */ + else + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ len += 8; sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ len += 8; @@ -511,6 +517,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int standby_message_timeout, char *partial_suffix) { char query[128]; + char slotcmd[128]; PGresult *res; XLogRecPtr stoppos; @@ -521,6 +528,29 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (!CheckServerVersionForStreaming(conn)) return false; + if (replication_slot != NULL) + { + /* + * Report the flush position, so the primary can know what WAL we'll + * possibly re-request, and remove older WAL safely. + * + * We only report it when a slot has explicitly been used, because + * reporting the flush position makes one elegible as a synchronous + * replica. People shouldn't include generic names in + * synchronous_standby_names, but we've protected them against it so + * far, so let's continue to do so in the situations when possible. + * If they've got a slot, though, we need to report the flush position, + * so that the master can remove WAL. + */ + reportFlushPosition = true; + sprintf(slotcmd, "SLOT \"%s\" ", replication_slot); + } + else + { + reportFlushPosition = false; + slotcmd[0] = 0; + } + if (sysidentifier != NULL) { /* Validate system identifier hasn't changed */ @@ -560,6 +590,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); } + /* + * initialize flush position to starting point, it's the caller's + * responsibility that that's sane. + */ + lastFlushPosition = startpos; + while (1) { /* @@ -606,7 +642,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, return true; /* Initiate the replication stream at specified location */ - snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u", + snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", + slotcmd, (uint32) (startpos >> 32), (uint32) startpos, timeline); res = PQexec(conn, query); @@ -810,7 +847,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, */ if (still_sending && stream_stop(blockpos, timeline, false)) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, blockpos)) { /* Potential error message is written by close_walfile */ goto error; @@ -909,7 +946,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, */ if (still_sending) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, blockpos)) { /* Error message written in close_walfile() */ goto error; @@ -1074,7 +1111,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Did we reach the end of a WAL segment? */ if (blockpos % XLOG_SEG_SIZE == 0) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, blockpos)) /* Error message written in close_walfile() */ goto error; diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 96fbed898f..041076ff1d 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -22,6 +22,7 @@ char *connection_string = NULL; char *dbhost = NULL; char *dbuser = NULL; char *dbport = NULL; +char *replication_slot = NULL; int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ static char *dbpassword = NULL; PGconn *conn = NULL; diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 77d6b86ced..bb3c34db07 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -6,6 +6,7 @@ extern char *dbhost; extern char *dbuser; extern char *dbport; extern int dbgetpassword; +extern char *replication_slot; /* Connection kept global so we can disconnect easily */ extern PGconn *conn; |
