summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/walreceiver.c182
-rw-r--r--src/backend/replication/walreceiverfuncs.c172
2 files changed, 128 insertions, 226 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f805e673e1..4a5ba5b426 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void InitWalRcv(void);
-static void WalRcvKill(int code, Datum arg);
+static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
@@ -153,21 +152,57 @@ static struct
void
WalReceiverMain(void)
{
- sigjmp_buf local_sigjmp_buf;
- MemoryContext walrcv_context;
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_disconnect == NULL)
- elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+ /*
+ * WalRcv should be set up already (if we are a backend, we inherit
+ * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ */
+ Assert(walrcv != NULL);
+
+ /*
+ * Mark walreceiver as running in shared memory.
+ *
+ * Do this as early as possible, so that if we fail later on, we'll
+ * set state to STOPPED. If we die before this, the startup process
+ * will keep waiting for us to start up, until it times out.
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->pid == 0);
+ switch(walrcv->walRcvState)
+ {
+ case WALRCV_STOPPING:
+ /* If we've already been requested to stop, don't start up. */
+ walrcv->walRcvState = WALRCV_STOPPED;
+ /* fall through */
+
+ case WALRCV_STOPPED:
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ break;
+
+ case WALRCV_STARTING:
+ /* The usual case */
+ break;
+
+ case WALRCV_RUNNING:
+ /* Shouldn't happen */
+ elog(PANIC, "walreceiver still running according to shared memory state");
+ }
+ /* Advertise our PID so that the startup process can kill us */
+ walrcv->pid = MyProcPid;
+ walrcv->walRcvState = WALRCV_RUNNING;
+
+ /* Fetch information required to start streaming */
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- /* Mark walreceiver in progress */
- InitWalRcv();
+ /* Arrange to clean up at walreceiver exit */
+ on_shmem_exit(WalRcvDie, 0);
/*
* If possible, make this process a group leader, so that the postmaster
@@ -200,81 +235,21 @@ WalReceiverMain(void)
/* We allow SIGQUIT (quickdie) at all times */
sigdelset(&BlockSig, SIGQUIT);
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+ if (walrcv_connect == NULL || walrcv_receive == NULL ||
+ walrcv_disconnect == NULL)
+ elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
/*
* Create a resource owner to keep track of our resources (not clear that
* we need this, but may as well have one).
*/
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
- /*
- * Create a memory context that we will do all our work in. We do this so
- * that we can reset the context during error recovery and thereby avoid
- * possible memory leaks.
- */
- walrcv_context = AllocSetContextCreate(TopMemoryContext,
- "Wal Receiver",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(walrcv_context);
-
- /*
- * If an exception is encountered, processing resumes here.
- *
- * This code is heavily based on bgwriter.c, q.v.
- */
- if (sigsetjmp(local_sigjmp_buf, 1) != 0)
- {
- /* Since not using PG_TRY, must reset error stack by hand */
- error_context_stack = NULL;
-
- /* Reset WalRcvImmediateInterruptOK */
- DisableWalRcvImmediateExit();
-
- /* Prevent interrupts while cleaning up */
- HOLD_INTERRUPTS();
-
- /* Report the error to the server log */
- EmitErrorReport();
-
- /* Disconnect any previous connection. */
- EnableWalRcvImmediateExit();
- walrcv_disconnect();
- DisableWalRcvImmediateExit();
-
- /*
- * Now return to normal top-level context and clear ErrorContext for
- * next time.
- */
- MemoryContextSwitchTo(walrcv_context);
- FlushErrorState();
-
- /* Flush any leaked data in the top-level context */
- MemoryContextResetAndDeleteChildren(walrcv_context);
-
- /* Now we can allow interrupts again */
- RESUME_INTERRUPTS();
-
- /*
- * Sleep at least 1 second after any error. A write error is likely
- * to be repeated, and we don't want to be filling the error logs as
- * fast as we can.
- */
- pg_usleep(1000000L);
- }
-
- /* We can now handle ereport(ERROR) */
- PG_exception_stack = &local_sigjmp_buf;
-
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
- /* Fetch connection information from shared memory */
- SpinLockAcquire(&walrcv->mutex);
- strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
- startpoint = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
-
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint);
@@ -330,63 +305,24 @@ WalReceiverMain(void)
}
}
-/* Advertise our pid in shared memory, so that startup process can kill us. */
-static void
-InitWalRcv(void)
-{
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /*
- * WalRcv should be set up already (if we are a backend, we inherit
- * this by fork() or EXEC_BACKEND mechanism from the postmaster).
- */
- if (walrcv == NULL)
- elog(PANIC, "walreceiver control data uninitialized");
-
- /* If we've already been requested to stop, don't start up */
- SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->pid == 0);
- if (walrcv->walRcvState == WALRCV_STOPPED ||
- walrcv->walRcvState == WALRCV_STOPPING)
- {
- walrcv->walRcvState = WALRCV_STOPPED;
- SpinLockRelease(&walrcv->mutex);
- proc_exit(1);
- }
- walrcv->pid = MyProcPid;
- SpinLockRelease(&walrcv->mutex);
-
- /* Arrange to clean up at walreceiver exit */
- on_shmem_exit(WalRcvKill, 0);
-}
-
/*
- * Clear our pid from shared memory at exit.
+ * Mark us as STOPPED in shared memory at exit.
*/
static void
-WalRcvKill(int code, Datum arg)
+WalRcvDie(int code, Datum arg)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- bool stopped = false;
SpinLockAcquire(&walrcv->mutex);
- if (walrcv->walRcvState == WALRCV_STOPPING ||
- walrcv->walRcvState == WALRCV_STOPPED)
- {
- walrcv->walRcvState = WALRCV_STOPPED;
- stopped = true;
- elog(LOG, "walreceiver stopped");
- }
+ Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+ walrcv->walRcvState == WALRCV_STOPPING);
+ walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
+ /* Terminate the connection gracefully. */
walrcv_disconnect();
-
- /* If requested to stop, tell postmaster to not restart us. */
- if (stopped)
- SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
}
/* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index c1d7b55887..4fb132dcd4 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -18,6 +18,8 @@
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
#include <unistd.h>
#include <signal.h>
@@ -30,8 +32,11 @@
WalRcvData *WalRcv = NULL;
-static bool CheckForStandbyTrigger(void);
-static void ShutdownWalRcv(void);
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10
/* Report shared memory space needed by WalRcvShmemInit */
Size
@@ -62,7 +67,7 @@ WalRcvShmemInit(void)
/* Initialize the data structures */
MemSet(WalRcv, 0, WalRcvShmemSize());
- WalRcv->walRcvState = WALRCV_NOT_STARTED;
+ WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
}
@@ -73,90 +78,51 @@ WalRcvInProgress(void)
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
+ pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
- state = walrcv->walRcvState;
- SpinLockRelease(&walrcv->mutex);
- if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
- return true;
- else
- return false;
-}
-
-/*
- * Wait for the XLOG record at given position to become available.
- *
- * 'recptr' indicates the byte position which caller wants to read the
- * XLOG record up to. The byte position actually written and flushed
- * by walreceiver is returned. It can be higher than the requested
- * location, and the caller can safely read up to that point without
- * calling WaitNextXLogAvailable() again.
- *
- * If WAL streaming is ended (because a trigger file is found), *finished
- * is set to true and function returns immediately. The returned position
- * can be lower than requested in that case.
- *
- * Called by the startup process during streaming recovery.
- */
-XLogRecPtr
-WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
-{
- static XLogRecPtr receivedUpto = {0, 0};
-
- *finished = false;
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
- /* Quick exit if already known available */
- if (XLByteLT(recptr, receivedUpto))
- return receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- for (;;)
+ /*
+ * If it has taken too long for walreceiver to start up, give up.
+ * Setting the state to STOPPED ensures that if walreceiver later
+ * does start up after all, it will see that it's not supposed to be
+ * running and die without doing anything.
+ */
+ if (state == WALRCV_STARTING)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /* Update local status */
- SpinLockAcquire(&walrcv->mutex);
- receivedUpto = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
+ pg_time_t now = (pg_time_t) time(NULL);
- /* If available already, leave here */
- if (XLByteLT(recptr, receivedUpto))
- return receivedUpto;
-
- /* Check to see if the trigger file exists */
- if (CheckForStandbyTrigger())
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
- *finished = true;
- return receivedUpto;
- }
+ SpinLockAcquire(&walrcv->mutex);
- pg_usleep(100000L); /* 100ms */
-
- /*
- * This possibly-long loop needs to handle interrupts of startup
- * process.
- */
- HandleStartupProcInterrupts();
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ state = walrcv->walRcvState = WALRCV_STOPPED;
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive(true))
- exit(1);
+ SpinLockRelease(&walrcv->mutex);
+ }
}
+
+ if (state != WALRCV_STOPPED)
+ return true;
+ else
+ return false;
}
/*
- * Stop walreceiver and wait for it to die.
+ * Stop walreceiver (if running) and wait for it to die.
*/
-static void
+void
ShutdownWalRcv(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- pid_t walrcvpid;
+ pid_t walrcvpid = 0;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -164,15 +130,25 @@ ShutdownWalRcv(void)
* restart itself.
*/
SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->walRcvState == WALRCV_RUNNING);
- walrcv->walRcvState = WALRCV_STOPPING;
- walrcvpid = walrcv->pid;
+ switch(walrcv->walRcvState)
+ {
+ case WALRCV_STOPPED:
+ break;
+ case WALRCV_STARTING:
+ walrcv->walRcvState = WALRCV_STOPPED;
+ break;
+
+ case WALRCV_RUNNING:
+ walrcv->walRcvState = WALRCV_STOPPING;
+ /* fall through */
+ case WALRCV_STOPPING:
+ walrcvpid = walrcv->pid;
+ break;
+ }
SpinLockRelease(&walrcv->mutex);
/*
- * Pid can be 0, if no walreceiver process is active right now.
- * Postmaster should restart it, and when it does, it will see the
- * STOPPING state.
+ * Signal walreceiver process if it was still running.
*/
if (walrcvpid != 0)
kill(walrcvpid, SIGTERM);
@@ -194,30 +170,6 @@ ShutdownWalRcv(void)
}
/*
- * Check to see if the trigger file exists. If it does, request postmaster
- * to shut down walreceiver and wait for it to exit, and remove the trigger
- * file.
- */
-static bool
-CheckForStandbyTrigger(void)
-{
- struct stat stat_buf;
-
- if (TriggerFile == NULL)
- return false;
-
- if (stat(TriggerFile, &stat_buf) == 0)
- {
- ereport(LOG,
- (errmsg("trigger file found: %s", TriggerFile)));
- ShutdownWalRcv();
- unlink(TriggerFile);
- return true;
- }
- return false;
-}
-
-/*
* Request postmaster to start walreceiver.
*
* recptr indicates the position where streaming should begin, and conninfo
@@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ pg_time_t now = (pg_time_t) time(NULL);
- Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+ /*
+ * We always start at the beginning of the segment.
+ * That prevents a broken segment (i.e., with no records in the
+ * first half of a segment) from being created by XLOG streaming,
+ * which might cause trouble later on if the segment is e.g
+ * archived.
+ */
+ if (recptr.xrecoff % XLogSegSize != 0)
+ recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+
+ /* It better be stopped before we try to restart it */
+ Assert(walrcv->walRcvState == WALRCV_STOPPED);
- /* locking is just pro forma here; walreceiver isn't started yet */
SpinLockAcquire(&walrcv->mutex);
- walrcv->receivedUpto = recptr;
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
- walrcv->walRcvState = WALRCV_RUNNING;
+ walrcv->walRcvState = WALRCV_STARTING;
+ walrcv->startTime = now;
+
+ walrcv->receivedUpto = recptr;
SpinLockRelease(&walrcv->mutex);
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
@@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void)
return recptr;
}
+