summaryrefslogtreecommitdiff
path: root/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c317
1 files changed, 317 insertions, 0 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
new file mode 100644
index 0000000000..54b86fd135
--- /dev/null
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -0,0 +1,317 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqwalreceiver.c
+ *
+ * This file contains the libpq-specific parts of walreceiver. It's
+ * loaded as a dynamic module to avoid linking the main server binary with
+ * libpq.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+
+#include "libpq-fe.h"
+#include "access/xlog.h"
+#include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "utils/builtins.h"
+
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#ifdef HAVE_SYS_POLL_H
+#include <sys/poll.h>
+#endif
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+/* Current connection to the primary, if any */
+static PGconn *streamConn = NULL;
+static bool justconnected = false;
+
+/* Buffer for currently read records */
+static char *recvBuf = NULL;
+
+/* Prototypes for interface functions */
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
+ int *len);
+static void libpqrcv_disconnect(void);
+
+/* Prototypes for private functions */
+static bool libpq_select(int timeout_ms);
+
+/*
+ * Module load callback
+ */
+void
+_PG_init(void)
+{
+ /* Tell walreceiver how to reach us */
+ if (walrcv_connect != NULL || walrcv_receive != NULL || walrcv_disconnect)
+ elog(ERROR, "libpqwalreceiver already loaded");
+ walrcv_connect = libpqrcv_connect;
+ walrcv_receive = libpqrcv_receive;
+ walrcv_disconnect = libpqrcv_disconnect;
+}
+
+/*
+ * Establish the connection to the primary server for XLOG streaming
+ */
+static bool
+libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+{
+ char conninfo_repl[MAXCONNINFO + 14];
+ char *primary_sysid;
+ char standby_sysid[32];
+ TimeLineID primary_tli;
+ TimeLineID standby_tli;
+ PGresult *res;
+ char cmd[64];
+
+ Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0);
+
+ /* Connect */
+ snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
+
+ streamConn = PQconnectdb(conninfo_repl);
+ if (PQstatus(streamConn) != CONNECTION_OK)
+ ereport(ERROR,
+ (errmsg("could not connect to the primary server : %s",
+ PQerrorMessage(streamConn))));
+
+ /*
+ * Get the system identifier and timeline ID as a DataRow message
+ * from the primary server.
+ */
+ res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive the SYSID and timeline ID from "
+ "the primary server: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (PQnfields(res) != 2 || PQntuples(res) != 1)
+ {
+ int ntuples = PQntuples(res);
+ int nfields = PQnfields(res);
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
+ ntuples, nfields)));
+ }
+ primary_sysid = PQgetvalue(res, 0, 0);
+ primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+
+ /*
+ * Confirm that the system identifier of the primary is the same
+ * as ours.
+ */
+ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ if (strcmp(primary_sysid, standby_sysid) != 0)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("system differs between the primary and standby"),
+ errdetail("the primary SYSID is %s, standby SYSID is %s",
+ primary_sysid, standby_sysid)));
+ }
+
+ /*
+ * Confirm that the current timeline of the primary is the same
+ * as the recovery target timeline.
+ */
+ standby_tli = GetRecoveryTargetTLI();
+ PQclear(res);
+ if (primary_tli != standby_tli)
+ ereport(ERROR,
+ (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+ primary_tli, standby_tli)));
+ ThisTimeLineID = primary_tli;
+
+ /* Start streaming from the point requested by startup process */
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+ startpoint.xlogid, startpoint.xrecoff);
+ res = PQexec(streamConn, cmd);
+ if (PQresultStatus(res) != PGRES_COPY_OUT)
+ ereport(ERROR,
+ (errmsg("could not start XLOG streaming: %s",
+ PQerrorMessage(streamConn))));
+ PQclear(res);
+
+ justconnected = true;
+
+ return true;
+}
+
+/*
+ * Wait until we can read WAL stream, or timeout.
+ *
+ * Returns true if data has become available for reading, false if timed out
+ * or interrupted by signal.
+ *
+ * This is based on pqSocketCheck.
+ */
+static bool
+libpq_select(int timeout_ms)
+{
+ int ret;
+
+ Assert(streamConn != NULL);
+ if (PQsocket(streamConn) < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+
+ /* We use poll(2) if available, otherwise select(2) */
+ {
+#ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = PQsocket(streamConn);
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ ret = poll(&input_fd, 1, timeout_ms);
+#else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(streamConn), &input_mask);
+
+ if (timeout_ms < 0)
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
+ }
+
+ ret = select(PQsocket(streamConn) + 1, &input_mask,
+ NULL, NULL, ptr_timeout);
+#endif /* HAVE_POLL */
+ }
+
+ if (ret == 0 || (ret < 0 && errno == EINTR))
+ return false;
+ if (ret < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ return true;
+}
+
+/*
+ * Disconnect connection to primary, if any.
+ */
+static void
+libpqrcv_disconnect(void)
+{
+ PQfinish(streamConn);
+ streamConn = NULL;
+ justconnected = false;
+}
+
+/*
+ * Receive any WAL records available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * Returns:
+ *
+ * True if data was received. *recptr, *buffer and *len are set to
+ * the WAL location of the received data, buffer holding it, and length,
+ * respectively.
+ *
+ * False if no data was available within timeout, or wait was interrupted
+ * by signal.
+ *
+ * The buffer returned is only valid until the next call of this function or
+ * libpq_connect/disconnect.
+ *
+ * ereports on error.
+ */
+static bool
+libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+{
+ int rawlen;
+
+ if (recvBuf != NULL)
+ PQfreemem(recvBuf);
+ recvBuf = NULL;
+
+ /*
+ * If the caller requested to block, wait for data to arrive. But if
+ * this is the first call after connecting, don't wait, because
+ * there might already be some data in libpq buffer that we haven't
+ * returned to caller.
+ */
+ if (timeout > 0 && !justconnected)
+ {
+ if (!libpq_select(timeout))
+ return false;
+
+ if (PQconsumeInput(streamConn) == 0)
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+ }
+ justconnected = false;
+
+ /* Receive CopyData message */
+ rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (rawlen == 0) /* no records available yet, then return */
+ return false;
+ if (rawlen == -1) /* end-of-streaming or error */
+ {
+ PGresult *res;
+
+ res = PQgetResult(streamConn);
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("replication terminated by primary server")));
+ }
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (rawlen < -1)
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+
+ if (rawlen < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ /* Return received WAL records to caller */
+ *recptr = *((XLogRecPtr *) recvBuf);
+ *buffer = recvBuf + sizeof(XLogRecPtr);
+ *len = rawlen - sizeof(XLogRecPtr);
+
+ return true;
+}