diff options
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/walreceiver.c | 182 | ||||
| -rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 172 |
2 files changed, 128 insertions, 226 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f805e673e1..4a5ba5b426 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void InitWalRcv(void); -static void WalRcvKill(int code, Datum arg); +static void WalRcvDie(int code, Datum arg); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); @@ -153,21 +152,57 @@ static struct void WalReceiverMain(void) { - sigjmp_buf local_sigjmp_buf; - MemoryContext walrcv_context; char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - /* Load the libpq-specific functions */ - load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_disconnect == NULL) - elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + /* + * WalRcv should be set up already (if we are a backend, we inherit + * this by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + Assert(walrcv != NULL); + + /* + * Mark walreceiver as running in shared memory. + * + * Do this as early as possible, so that if we fail later on, we'll + * set state to STOPPED. If we die before this, the startup process + * will keep waiting for us to start up, until it times out. + */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + switch(walrcv->walRcvState) + { + case WALRCV_STOPPING: + /* If we've already been requested to stop, don't start up. */ + walrcv->walRcvState = WALRCV_STOPPED; + /* fall through */ + + case WALRCV_STOPPED: + SpinLockRelease(&walrcv->mutex); + proc_exit(1); + break; + + case WALRCV_STARTING: + /* The usual case */ + break; + + case WALRCV_RUNNING: + /* Shouldn't happen */ + elog(PANIC, "walreceiver still running according to shared memory state"); + } + /* Advertise our PID so that the startup process can kill us */ + walrcv->pid = MyProcPid; + walrcv->walRcvState = WALRCV_RUNNING; + + /* Fetch information required to start streaming */ + strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + startpoint = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); - /* Mark walreceiver in progress */ - InitWalRcv(); + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvDie, 0); /* * If possible, make this process a group leader, so that the postmaster @@ -200,81 +235,21 @@ WalReceiverMain(void) /* We allow SIGQUIT (quickdie) at all times */ sigdelset(&BlockSig, SIGQUIT); + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (walrcv_connect == NULL || walrcv_receive == NULL || + walrcv_disconnect == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + /* * Create a resource owner to keep track of our resources (not clear that * we need this, but may as well have one). */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. - */ - walrcv_context = AllocSetContextCreate(TopMemoryContext, - "Wal Receiver", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walrcv_context); - - /* - * If an exception is encountered, processing resumes here. - * - * This code is heavily based on bgwriter.c, q.v. - */ - if (sigsetjmp(local_sigjmp_buf, 1) != 0) - { - /* Since not using PG_TRY, must reset error stack by hand */ - error_context_stack = NULL; - - /* Reset WalRcvImmediateInterruptOK */ - DisableWalRcvImmediateExit(); - - /* Prevent interrupts while cleaning up */ - HOLD_INTERRUPTS(); - - /* Report the error to the server log */ - EmitErrorReport(); - - /* Disconnect any previous connection. */ - EnableWalRcvImmediateExit(); - walrcv_disconnect(); - DisableWalRcvImmediateExit(); - - /* - * Now return to normal top-level context and clear ErrorContext for - * next time. - */ - MemoryContextSwitchTo(walrcv_context); - FlushErrorState(); - - /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(walrcv_context); - - /* Now we can allow interrupts again */ - RESUME_INTERRUPTS(); - - /* - * Sleep at least 1 second after any error. A write error is likely - * to be repeated, and we don't want to be filling the error logs as - * fast as we can. - */ - pg_usleep(1000000L); - } - - /* We can now handle ereport(ERROR) */ - PG_exception_stack = &local_sigjmp_buf; - /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); - /* Fetch connection information from shared memory */ - SpinLockAcquire(&walrcv->mutex); - strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); - startpoint = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); - /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo, startpoint); @@ -330,63 +305,24 @@ WalReceiverMain(void) } } -/* Advertise our pid in shared memory, so that startup process can kill us. */ -static void -InitWalRcv(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * WalRcv should be set up already (if we are a backend, we inherit - * this by fork() or EXEC_BACKEND mechanism from the postmaster). - */ - if (walrcv == NULL) - elog(PANIC, "walreceiver control data uninitialized"); - - /* If we've already been requested to stop, don't start up */ - SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->pid == 0); - if (walrcv->walRcvState == WALRCV_STOPPED || - walrcv->walRcvState == WALRCV_STOPPING) - { - walrcv->walRcvState = WALRCV_STOPPED; - SpinLockRelease(&walrcv->mutex); - proc_exit(1); - } - walrcv->pid = MyProcPid; - SpinLockRelease(&walrcv->mutex); - - /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvKill, 0); -} - /* - * Clear our pid from shared memory at exit. + * Mark us as STOPPED in shared memory at exit. */ static void -WalRcvKill(int code, Datum arg) +WalRcvDie(int code, Datum arg) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - bool stopped = false; SpinLockAcquire(&walrcv->mutex); - if (walrcv->walRcvState == WALRCV_STOPPING || - walrcv->walRcvState == WALRCV_STOPPED) - { - walrcv->walRcvState = WALRCV_STOPPED; - stopped = true; - elog(LOG, "walreceiver stopped"); - } + Assert(walrcv->walRcvState == WALRCV_RUNNING || + walrcv->walRcvState == WALRCV_STOPPING); + walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); + /* Terminate the connection gracefully. */ walrcv_disconnect(); - - /* If requested to stop, tell postmaster to not restart us. */ - if (stopped) - SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); } /* SIGHUP: set flag to re-read config file at next convenient time */ diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index c1d7b55887..4fb132dcd4 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -18,6 +18,8 @@ #include <sys/types.h> #include <sys/stat.h> +#include <sys/time.h> +#include <time.h> #include <unistd.h> #include <signal.h> @@ -30,8 +32,11 @@ WalRcvData *WalRcv = NULL; -static bool CheckForStandbyTrigger(void); -static void ShutdownWalRcv(void); +/* + * How long to wait for walreceiver to start up after requesting + * postmaster to launch it. In seconds. + */ +#define WALRCV_STARTUP_TIMEOUT 10 /* Report shared memory space needed by WalRcvShmemInit */ Size @@ -62,7 +67,7 @@ WalRcvShmemInit(void) /* Initialize the data structures */ MemSet(WalRcv, 0, WalRcvShmemSize()); - WalRcv->walRcvState = WALRCV_NOT_STARTED; + WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); } @@ -73,90 +78,51 @@ WalRcvInProgress(void) /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; + pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); - state = walrcv->walRcvState; - SpinLockRelease(&walrcv->mutex); - if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) - return true; - else - return false; -} - -/* - * Wait for the XLOG record at given position to become available. - * - * 'recptr' indicates the byte position which caller wants to read the - * XLOG record up to. The byte position actually written and flushed - * by walreceiver is returned. It can be higher than the requested - * location, and the caller can safely read up to that point without - * calling WaitNextXLogAvailable() again. - * - * If WAL streaming is ended (because a trigger file is found), *finished - * is set to true and function returns immediately. The returned position - * can be lower than requested in that case. - * - * Called by the startup process during streaming recovery. - */ -XLogRecPtr -WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished) -{ - static XLogRecPtr receivedUpto = {0, 0}; - - *finished = false; + state = walrcv->walRcvState; + startTime = walrcv->startTime; - /* Quick exit if already known available */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; + SpinLockRelease(&walrcv->mutex); - for (;;) + /* + * If it has taken too long for walreceiver to start up, give up. + * Setting the state to STOPPED ensures that if walreceiver later + * does start up after all, it will see that it's not supposed to be + * running and die without doing anything. + */ + if (state == WALRCV_STARTING) { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* Update local status */ - SpinLockAcquire(&walrcv->mutex); - receivedUpto = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); + pg_time_t now = (pg_time_t) time(NULL); - /* If available already, leave here */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; - - /* Check to see if the trigger file exists */ - if (CheckForStandbyTrigger()) + if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - *finished = true; - return receivedUpto; - } + SpinLockAcquire(&walrcv->mutex); - pg_usleep(100000L); /* 100ms */ - - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); + if (walrcv->walRcvState == WALRCV_STARTING) + state = walrcv->walRcvState = WALRCV_STOPPED; - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive(true)) - exit(1); + SpinLockRelease(&walrcv->mutex); + } } + + if (state != WALRCV_STOPPED) + return true; + else + return false; } /* - * Stop walreceiver and wait for it to die. + * Stop walreceiver (if running) and wait for it to die. */ -static void +void ShutdownWalRcv(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - pid_t walrcvpid; + pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -164,15 +130,25 @@ ShutdownWalRcv(void) * restart itself. */ SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_RUNNING); - walrcv->walRcvState = WALRCV_STOPPING; - walrcvpid = walrcv->pid; + switch(walrcv->walRcvState) + { + case WALRCV_STOPPED: + break; + case WALRCV_STARTING: + walrcv->walRcvState = WALRCV_STOPPED; + break; + + case WALRCV_RUNNING: + walrcv->walRcvState = WALRCV_STOPPING; + /* fall through */ + case WALRCV_STOPPING: + walrcvpid = walrcv->pid; + break; + } SpinLockRelease(&walrcv->mutex); /* - * Pid can be 0, if no walreceiver process is active right now. - * Postmaster should restart it, and when it does, it will see the - * STOPPING state. + * Signal walreceiver process if it was still running. */ if (walrcvpid != 0) kill(walrcvpid, SIGTERM); @@ -194,30 +170,6 @@ ShutdownWalRcv(void) } /* - * Check to see if the trigger file exists. If it does, request postmaster - * to shut down walreceiver and wait for it to exit, and remove the trigger - * file. - */ -static bool -CheckForStandbyTrigger(void) -{ - struct stat stat_buf; - - if (TriggerFile == NULL) - return false; - - if (stat(TriggerFile, &stat_buf) == 0) - { - ereport(LOG, - (errmsg("trigger file found: %s", TriggerFile))); - ShutdownWalRcv(); - unlink(TriggerFile); - return true; - } - return false; -} - -/* * Request postmaster to start walreceiver. * * recptr indicates the position where streaming should begin, and conninfo @@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + pg_time_t now = (pg_time_t) time(NULL); - Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); + /* + * We always start at the beginning of the segment. + * That prevents a broken segment (i.e., with no records in the + * first half of a segment) from being created by XLOG streaming, + * which might cause trouble later on if the segment is e.g + * archived. + */ + if (recptr.xrecoff % XLogSegSize != 0) + recptr.xrecoff -= recptr.xrecoff % XLogSegSize; + + /* It better be stopped before we try to restart it */ + Assert(walrcv->walRcvState == WALRCV_STOPPED); - /* locking is just pro forma here; walreceiver isn't started yet */ SpinLockAcquire(&walrcv->mutex); - walrcv->receivedUpto = recptr; if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; - walrcv->walRcvState = WALRCV_RUNNING; + walrcv->walRcvState = WALRCV_STARTING; + walrcv->startTime = now; + + walrcv->receivedUpto = recptr; SpinLockRelease(&walrcv->mutex); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); @@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void) return recptr; } + |
