summaryrefslogtreecommitdiff
path: root/src/bin
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin')
-rw-r--r--src/bin/initdb/initdb.c1
-rw-r--r--src/bin/pg_basebackup/pg_receivexlog.c5
-rw-r--r--src/bin/pg_basebackup/receivelog.c49
-rw-r--r--src/bin/pg_basebackup/streamutil.c1
-rw-r--r--src/bin/pg_basebackup/streamutil.h1
5 files changed, 51 insertions, 6 deletions
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 6b5302f6fd..a71320d945 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -195,6 +195,7 @@ const char *subdirs[] = {
"pg_multixact/offsets",
"base",
"base/1",
+ "pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp"
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 3c6ab9a902..8a702e3388 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -67,6 +67,7 @@ usage(void)
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_(" --slot replication slot to use\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
@@ -343,6 +344,7 @@ main(int argc, char **argv)
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
+ {"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
@@ -409,6 +411,9 @@ main(int argc, char **argv)
exit(1);
}
break;
+ case 'S':
+ replication_slot = pg_strdup(optarg);
+ break;
case 'n':
noloop = 1;
break;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 2555904cd0..7d3c76c994 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,6 +31,8 @@
/* fd and filename for currently open WAL file */
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
+static bool reportFlushPosition = false;
+static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
@@ -133,7 +135,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
{
off_t currpos;
@@ -187,6 +189,7 @@ close_walfile(char *basedir, char *partial_suffix)
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
+ lastFlushPosition = pos;
return true;
}
@@ -421,7 +424,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
len += 1;
sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
- sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
+ if (reportFlushPosition)
+ sendint64(lastFlushPosition, &replybuf[len]); /* flush */
+ else
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
@@ -511,6 +517,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int standby_message_timeout, char *partial_suffix)
{
char query[128];
+ char slotcmd[128];
PGresult *res;
XLogRecPtr stoppos;
@@ -521,6 +528,29 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (!CheckServerVersionForStreaming(conn))
return false;
+ if (replication_slot != NULL)
+ {
+ /*
+ * Report the flush position, so the primary can know what WAL we'll
+ * possibly re-request, and remove older WAL safely.
+ *
+ * We only report it when a slot has explicitly been used, because
+ * reporting the flush position makes one elegible as a synchronous
+ * replica. People shouldn't include generic names in
+ * synchronous_standby_names, but we've protected them against it so
+ * far, so let's continue to do so in the situations when possible.
+ * If they've got a slot, though, we need to report the flush position,
+ * so that the master can remove WAL.
+ */
+ reportFlushPosition = true;
+ sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
+ }
+ else
+ {
+ reportFlushPosition = false;
+ slotcmd[0] = 0;
+ }
+
if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
@@ -560,6 +590,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
}
+ /*
+ * initialize flush position to starting point, it's the caller's
+ * responsibility that that's sane.
+ */
+ lastFlushPosition = startpos;
+
while (1)
{
/*
@@ -606,7 +642,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
return true;
/* Initiate the replication stream at specified location */
- snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+ slotcmd,
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
@@ -810,7 +847,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
goto error;
@@ -909,7 +946,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
goto error;
@@ -1074,7 +1111,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
/* Error message written in close_walfile() */
goto error;
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 96fbed898f..041076ff1d 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -22,6 +22,7 @@ char *connection_string = NULL;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
+char *replication_slot = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
static char *dbpassword = NULL;
PGconn *conn = NULL;
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 77d6b86ced..bb3c34db07 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -6,6 +6,7 @@ extern char *dbhost;
extern char *dbuser;
extern char *dbport;
extern int dbgetpassword;
+extern char *replication_slot;
/* Connection kept global so we can disconnect easily */
extern PGconn *conn;