diff options
Diffstat (limited to 'src/backend/port')
| -rw-r--r-- | src/backend/port/Makefile | 4 | ||||
| -rw-r--r-- | src/backend/port/unix_latch.c | 435 | ||||
| -rw-r--r-- | src/backend/port/win32_latch.c | 284 |
3 files changed, 721 insertions, 2 deletions
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile index 2009919ad9..ed02523f38 100644 --- a/src/backend/port/Makefile +++ b/src/backend/port/Makefile @@ -13,7 +13,7 @@ # be converted to Method 2. # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/port/Makefile,v 1.28 2010/07/05 18:54:37 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/port/Makefile,v 1.29 2010/09/11 15:48:04 heikki Exp $ # #------------------------------------------------------------------------- @@ -21,7 +21,7 @@ subdir = src/backend/port top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = dynloader.o pg_sema.o pg_shmem.o $(TAS) +OBJS = dynloader.o pg_sema.o pg_shmem.o pg_latch.o $(TAS) ifeq ($(PORTNAME), darwin) SUBDIRS += darwin diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c new file mode 100644 index 0000000000..ba136ea5ae --- /dev/null +++ b/src/backend/port/unix_latch.c @@ -0,0 +1,435 @@ +/*------------------------------------------------------------------------- + * + * unix_latch.c + * Routines for inter-process latches + * + * A latch is a boolean variable, with operations that let you to sleep + * until it is set. A latch can be set from another process, or a signal + * handler within the same process. + * + * The latch interface is a reliable replacement for the common pattern of + * using pg_usleep() or select() to wait until a signal arrives, where the + * signal handler sets a global variable. Because on some platforms, an + * incoming signal doesn't interrupt sleep, and even on platforms where it + * does there is a race condition if the signal arrives just before + * entering the sleep, the common pattern must periodically wake up and + * poll the global variable. pselect() system call was invented to solve + * the problem, but it is not portable enough. Latches are designed to + * overcome these limitations, allowing you to sleep without polling and + * ensuring a quick response to signals from other processes. + * + * There are two kinds of latches: local and shared. A local latch is + * initialized by InitLatch, and can only be set from the same process. + * A local latch can be used to wait for a signal to arrive, by calling + * SetLatch in the signal handler. A shared latch resides in shared memory, + * and must be initialized at postmaster startup by InitSharedLatch. Before + * a shared latch can be waited on, it must be associated with a process + * with OwnLatch. Only the process owning the latch can wait on it, but any + * process can set it. + * + * There are three basic operations on a latch: + * + * SetLatch - Sets the latch + * ResetLatch - Clears the latch, allowing it to be set again + * WaitLatch - Waits for the latch to become set + * + * The correct pattern to wait for an event is: + * + * for (;;) + * { + * ResetLatch(); + * if (work to do) + * Do Stuff(); + * + * WaitLatch(); + * } + * + * It's important to reset the latch *before* checking if there's work to + * do. Otherwise, if someone sets the latch between the check and the + * ResetLatch call, you will miss it and Wait will block. + * + * To wake up the waiter, you must first set a global flag or something + * else that the main loop tests in the "if (work to do)" part, and call + * SetLatch *after* that. SetLatch is designed to return quickly if the + * latch is already set. + * + * + * Implementation + * -------------- + * + * The Unix implementation uses the so-called self-pipe trick to overcome + * the race condition involved with select() and setting a global flag + * in the signal handler. When a latch is set and the current process + * is waiting for it, the signal handler wakes up the select() in + * WaitLatch by writing a byte to a pipe. A signal by itself doesn't + * interrupt select() on all platforms, and even on platforms where it + * does, a signal that arrives just before the select() call does not + * prevent the select() from entering sleep. An incoming byte on a pipe + * however reliably interrupts the sleep, and makes select() to return + * immediately if the signal arrives just before select() begins. + * + * When SetLatch is called from the same process that owns the latch, + * SetLatch writes the byte directly to the pipe. If it's owned by another + * process, SIGUSR1 is sent and the signal handler in the waiting process + * writes the byte to the pipe on behalf of the signaling process. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/port/unix_latch.c,v 1.1 2010/09/11 15:48:04 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <fcntl.h> +#include <signal.h> +#include <unistd.h> + +#include "miscadmin.h" +#include "storage/latch.h" +#include "storage/shmem.h" + +/* Are we currently in WaitLatch? The signal handler would like to know. */ +static volatile sig_atomic_t waiting = false; + +/* Read and write end of the self-pipe */ +static int selfpipe_readfd = -1; +static int selfpipe_writefd = -1; + +/* private function prototypes */ +static void initSelfPipe(void); +static void drainSelfPipe(void); +static void sendSelfPipeByte(void); + + +/* + * Initialize a backend-local latch. + */ +void +InitLatch(volatile Latch *latch) +{ + /* Initialize the self pipe if this is our first latch in the process */ + if (selfpipe_readfd == -1) + initSelfPipe(); + + latch->is_set = false; + latch->owner_pid = MyProcPid; + latch->is_shared = false; +} + +/* + * Initialize a shared latch that can be set from other processes. The latch + * is initially owned by no-one, use OwnLatch to associate it with the + * current process. + * + * NB: When you introduce a new shared latch, you must increase the shared + * latch count in NumSharedLatches in win32_latch.c! + */ +void +InitSharedLatch(volatile Latch *latch) +{ + latch->is_set = false; + latch->owner_pid = 0; + latch->is_shared = true; +} + +/* + * Associate a shared latch with the current process, allowing it to + * wait on it. + * + * Make sure that latch_sigusr1_handler() is called from the SIGUSR1 signal + * handler, as shared latches use SIGUSR1 to for inter-process communication. + */ +void +OwnLatch(volatile Latch *latch) +{ + Assert(latch->is_shared); + + /* Initialize the self pipe if this is our first latch in the process */ + if (selfpipe_readfd == -1) + initSelfPipe(); + + if (latch->owner_pid != 0) + elog(ERROR, "latch already owned"); + latch->owner_pid = MyProcPid; +} + +/* + * Disown a shared latch currently owned by the current process. + */ +void +DisownLatch(volatile Latch *latch) +{ + Assert(latch->is_shared); + Assert(latch->owner_pid == MyProcPid); + latch->owner_pid = 0; +} + +/* + * Wait for given latch to be set or until timeout is exceeded. + * If the latch is already set, the function returns immediately. + * + * The 'timeout' is given in microseconds, and -1 means wait forever. + * On some platforms, signals cause the timeout to be restarted, so beware + * that the function can sleep for several times longer than the specified + * timeout. + * + * The latch must be owned by the current process, ie. it must be a + * backend-local latch initialized with InitLatch, or a shared latch + * associated with the current process by calling OwnLatch. + * + * Returns 'true' if the latch was set, or 'false' if timeout was reached. + */ +bool +WaitLatch(volatile Latch *latch, long timeout) +{ + return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; +} + +/* + * Like WaitLatch, but will also return when there's data available in + * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch + * was set, or 2 if the scoket became readable. + */ +int +WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) +{ + struct timeval tv, *tvp = NULL; + fd_set input_mask; + int rc; + int result = 0; + + if (latch->owner_pid != MyProcPid) + elog(ERROR, "cannot wait on a latch owned by another process"); + + /* Initialize timeout */ + if (timeout >= 0) + { + tv.tv_sec = timeout / 1000000L; + tv.tv_usec = timeout % 1000000L; + tvp = &tv; + } + + waiting = true; + for (;;) + { + int hifd; + + /* + * Clear the pipe, and check if the latch is set already. If someone + * sets the latch between this and the select() below, the setter + * will write a byte to the pipe (or signal us and the signal handler + * will do that), and the select() will return immediately. + */ + drainSelfPipe(); + if (latch->is_set) + { + result = 1; + break; + } + + FD_ZERO(&input_mask); + FD_SET(selfpipe_readfd, &input_mask); + hifd = selfpipe_readfd; + if (sock != PGINVALID_SOCKET) + { + FD_SET(sock, &input_mask); + if (sock > hifd) + hifd = sock; + } + + rc = select(hifd + 1, &input_mask, NULL, NULL, tvp); + if (rc < 0) + { + if (errno == EINTR) + continue; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + } + if (rc == 0) + { + /* timeout exceeded */ + result = 0; + break; + } + if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask)) + { + result = 2; + break; /* data available in socket */ + } + } + waiting = false; + + return result; +} + +/* + * Sets a latch and wakes up anyone waiting on it. Returns quickly if the + * latch is already set. + */ +void +SetLatch(volatile Latch *latch) +{ + pid_t owner_pid; + + /* Quick exit if already set */ + if (latch->is_set) + return; + + latch->is_set = true; + + /* + * See if anyone's waiting for the latch. It can be the current process + * if we're in a signal handler. We use the self-pipe to wake up the + * select() in that case. If it's another process, send a signal. + * + * Fetch owner_pid only once, in case the owner simultaneously disowns + * the latch and clears owner_pid. XXX: This assumes that pid_t is + * atomic, which isn't guaranteed to be true! In practice, the effective + * range of pid_t fits in a 32 bit integer, and so should be atomic. In + * the worst case, we might end up signaling wrong process if the right + * one disowns the latch just as we fetch owner_pid. Even then, you're + * very unlucky if a process with that bogus pid exists. + */ + owner_pid = latch->owner_pid; + if (owner_pid == 0) + return; + else if (owner_pid == MyProcPid) + sendSelfPipeByte(); + else + kill(owner_pid, SIGUSR1); +} + +/* + * Clear the latch. Calling WaitLatch after this will sleep, unless + * the latch is set again before the WaitLatch call. + */ +void +ResetLatch(volatile Latch *latch) +{ + /* Only the owner should reset the latch */ + Assert(latch->owner_pid == MyProcPid); + + latch->is_set = false; +} + +/* + * LatchShmemSize + * Compute space needed for latch's shared memory + * + * Not needed for Unix implementation. + */ +Size +LatchShmemSize(void) +{ + return 0; +} + +/* + * LatchShmemInit + * Allocate and initialize shared memory needed for latches + * + * Not needed for Unix implementation. + */ +void +LatchShmemInit(void) +{ +} + +/* + * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. Wake + * up WaitLatch. + */ +void +latch_sigusr1_handler(void) +{ + if (waiting) + sendSelfPipeByte(); +} + +/* initialize the self-pipe */ +static void +initSelfPipe(void) +{ + int pipefd[2]; + + /* + * Set up the self-pipe that allows a signal handler to wake up the + * select() in WaitLatch. Make the write-end non-blocking, so that + * SetLatch won't block if the event has already been set many times + * filling the kernel buffer. Make the read-end non-blocking too, so + * that we can easily clear the pipe by reading until EAGAIN or + * EWOULDBLOCK. + */ + if (pipe(pipefd) < 0) + elog(FATAL, "pipe() failed: %m"); + if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) + elog(FATAL, "fcntl() failed on read-end of self-pipe: %m"); + if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) + elog(FATAL, "fcntl() failed on write-end of self-pipe: %m"); + + selfpipe_readfd = pipefd[0]; + selfpipe_writefd = pipefd[1]; +} + +/* Send one byte to the self-pipe, to wake up WaitLatch */ +static void +sendSelfPipeByte(void) +{ + int rc; + char dummy = 0; + +retry: + rc = write(selfpipe_writefd, &dummy, 1); + if (rc < 0) + { + /* If interrupted by signal, just retry */ + if (errno == EINTR) + goto retry; + + /* + * If the pipe is full, we don't need to retry, the data that's + * there already is enough to wake up WaitLatch. + */ + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + /* + * Oops, the write() failed for some other reason. We might be in + * a signal handler, so it's not safe to elog(). We have no choice + * but silently ignore the error. + */ + return; + } +} + +/* Read all available data from the self-pipe */ +static void +drainSelfPipe(void) +{ + /* + * There shouldn't normally be more than one byte in the pipe, or maybe + * a few more if multiple processes run SetLatch at the same instant. + */ + char buf[16]; + int rc; + + for (;;) + { + rc = read(selfpipe_readfd, buf, sizeof(buf)); + if (rc < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; /* the pipe is empty */ + else if (errno == EINTR) + continue; /* retry */ + else + elog(ERROR, "read() on self-pipe failed: %m"); + } + else if (rc == 0) + elog(ERROR, "unexpected EOF on self-pipe"); + } +} diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c new file mode 100644 index 0000000000..f6d4920e46 --- /dev/null +++ b/src/backend/port/win32_latch.c @@ -0,0 +1,284 @@ +/*------------------------------------------------------------------------- + * + * win32_latch.c + * Windows implementation of latches. + * + * The Windows implementation uses Windows events. See unix_latch.c for + * information on usage. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/port/win32_latch.c,v 1.1 2010/09/11 15:48:04 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <fcntl.h> +#include <signal.h> +#include <unistd.h> + +#include "miscadmin.h" +#include "replication/walsender.h" +#include "storage/latch.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +/* + * Shared latches are implemented with Windows events that are shared by + * all postmaster child processes. At postmaster startup we create enough + * Event objects, and mark them as inheritable so that they are accessible + * in child processes. The handles are stored in sharedHandles. + */ +typedef struct +{ + slock_t mutex; /* protects all the other fields */ + + int maxhandles; /* number of shared handles created */ + int nfreehandles; /* number of free handles in array */ + HANDLE handles[1]; /* free handles, variable length */ +} SharedEventHandles; + +static SharedEventHandles *sharedHandles; + +void +InitLatch(volatile Latch *latch) +{ + latch->event = CreateEvent(NULL, TRUE, FALSE, NULL); + latch->is_shared = false; + latch->is_set = false; +} + +void +InitSharedLatch(volatile Latch *latch) +{ + latch->is_shared = true; + latch->is_set = false; + latch->event = NULL; +} + +void +OwnLatch(volatile Latch *latch) +{ + HANDLE event; + + /* Sanity checks */ + Assert(latch->is_shared); + if (latch->event != 0) + elog(ERROR, "latch already owned"); + + /* Reserve an event handle from the shared handles array */ + SpinLockAcquire(&sharedHandles->mutex); + if (sharedHandles->nfreehandles <= 0) + { + SpinLockRelease(&sharedHandles->mutex); + elog(ERROR, "out of shared event objects"); + } + sharedHandles->nfreehandles--; + event = sharedHandles->handles[sharedHandles->nfreehandles]; + SpinLockRelease(&sharedHandles->mutex); + + latch->event = event; +} + +void +DisownLatch(volatile Latch *latch) +{ + Assert(latch->is_shared); + Assert(latch->event != NULL); + + /* Put the event handle back to the pool */ + SpinLockAcquire(&sharedHandles->mutex); + if (sharedHandles->nfreehandles >= sharedHandles->maxhandles) + { + SpinLockRelease(&sharedHandles->mutex); + elog(PANIC, "too many free event handles"); + } + sharedHandles->handles[sharedHandles->nfreehandles] = latch->event; + sharedHandles->nfreehandles++; + SpinLockRelease(&sharedHandles->mutex); + + latch->event = NULL; +} + +bool +WaitLatch(volatile Latch *latch, long timeout) +{ + return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; +} + +int +WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) +{ + DWORD rc; + HANDLE events[3]; + HANDLE latchevent; + HANDLE sockevent; + int numevents; + int result = 0; + + latchevent = latch->event; + + events[0] = latchevent; + events[1] = pgwin32_signal_event; + numevents = 2; + if (sock != PGINVALID_SOCKET) + { + sockevent = WSACreateEvent(); + WSAEventSelect(sock, sockevent, FD_READ); + events[numevents++] = sockevent; + } + + for (;;) + { + /* + * Reset the event, and check if the latch is set already. If someone + * sets the latch between this and the WaitForMultipleObjects() call + * below, the setter will set the event and WaitForMultipleObjects() + * will return immediately. + */ + if (!ResetEvent(latchevent)) + elog(ERROR, "ResetEvent failed: error code %d", (int) GetLastError()); + if (latch->is_set) + { + result = 1; + break; + } + + rc = WaitForMultipleObjects(numevents, events, FALSE, + (timeout >= 0) ? (timeout / 1000) : INFINITE); + if (rc == WAIT_FAILED) + elog(ERROR, "WaitForMultipleObjects() failed: error code %d", (int) GetLastError()); + else if (rc == WAIT_TIMEOUT) + { + result = 0; + break; + } + else if (rc == WAIT_OBJECT_0 + 1) + pgwin32_dispatch_queued_signals(); + else if (rc == WAIT_OBJECT_0 + 2) + { + Assert(sock != PGINVALID_SOCKET); + result = 2; + break; + } + else if (rc != WAIT_OBJECT_0) + elog(ERROR, "unexpected return code from WaitForMultipleObjects(): %d", rc); + } + + /* Clean up the handle we created for the socket */ + if (sock != PGINVALID_SOCKET) + { + WSAEventSelect(sock, sockevent, 0); + WSACloseEvent(sockevent); + } + + return result; +} + +void +SetLatch(volatile Latch *latch) +{ + HANDLE handle; + + /* Quick exit if already set */ + if (latch->is_set) + return; + + latch->is_set = true; + + /* + * See if anyone's waiting for the latch. It can be the current process + * if we're in a signal handler. Use a local variable here in case the + * latch is just disowned between the test and the SetEvent call, and + * event field set to NULL. + * + * Fetch handle field only once, in case the owner simultaneously + * disowns the latch and clears handle. This assumes that HANDLE is + * atomic, which isn't guaranteed to be true! In practice, it should be, + * and in the worst case we end up calling SetEvent with a bogus handle, + * and SetEvent will return an error with no harm done. + */ + handle = latch->event; + if (handle) + { + SetEvent(handle); + /* + * Note that we silently ignore any errors. We might be in a signal + * handler or other critical path where it's not safe to call elog(). + */ + } +} + +void +ResetLatch(volatile Latch *latch) +{ + latch->is_set = false; +} + +/* + * Number of shared latches, used to allocate the right number of shared + * Event handles at postmaster startup. You must update this if you + * introduce a new shared latch! + */ +static int +NumSharedLatches(void) +{ + int numLatches = 0; + + /* Each walsender needs one latch */ + numLatches += max_wal_senders; + + return numLatches; +} + +/* + * LatchShmemSize + * Compute space needed for latch's shared memory + */ +Size +LatchShmemSize(void) +{ + return offsetof(SharedEventHandles, handles) + + NumSharedLatches() * sizeof(HANDLE); +} + +/* + * LatchShmemInit + * Allocate and initialize shared memory needed for latches + */ +void +LatchShmemInit(void) +{ + Size size = LatchShmemSize(); + bool found; + + sharedHandles = ShmemInitStruct("SharedEventHandles", size, &found); + + /* If we're first, initialize the struct and allocate handles */ + if (!found) + { + int i; + SECURITY_ATTRIBUTES sa; + + /* + * Set up security attributes to specify that the events are + * inherited. + */ + ZeroMemory(&sa, sizeof(sa)); + sa.nLength = sizeof(sa); + sa.bInheritHandle = TRUE; + + SpinLockInit(&sharedHandles->mutex); + sharedHandles->maxhandles = NumSharedLatches(); + sharedHandles->nfreehandles = sharedHandles->maxhandles; + for (i = 0; i < sharedHandles->maxhandles; i++) + { + sharedHandles->handles[i] = CreateEvent(&sa, TRUE, FALSE, NULL); + if (sharedHandles->handles[i] == NULL) + elog(ERROR, "CreateEvent failed: error code %d", (int) GetLastError()); + } + } +} |
