diff options
| author | Simon Riggs <simon@2ndQuadrant.com> | 2011-12-31 13:30:26 +0000 |
|---|---|---|
| committer | Simon Riggs <simon@2ndQuadrant.com> | 2011-12-31 13:30:26 +0000 |
| commit | 64233902d22ba42846397cb7551894217522fad4 (patch) | |
| tree | 3d486d3a79b1ad543d99d726da4180e375437601 /src/backend/replication/walsender.c | |
| parent | 2ae2e9c00798685cd75ea0cc5120466bf2027b90 (diff) | |
| download | postgresql-64233902d22ba42846397cb7551894217522fad4.tar.gz | |
Send new protocol keepalive messages to standby servers.
Allows streaming replication users to calculate transfer latency
and apply delay via internal functions. No external functions yet.
Diffstat (limited to 'src/backend/replication/walsender.c')
| -rw-r--r-- | src/backend/replication/walsender.c | 42 |
1 files changed, 27 insertions, 15 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ea86520417..ed7298b6ee 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); +static void WalSndKeepalive(char *msgbuf); /* Main entry point for walsender process */ @@ -823,30 +824,24 @@ WalSndLoop(void) */ if (caughtup || pq_is_send_pending()) { - TimestampTz finish_time = 0; - long sleeptime = -1; + TimestampTz timeout = 0; + long sleeptime = 10000; /* 10 s */ int wakeEvents; wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_READABLE; + WL_SOCKET_READABLE | WL_TIMEOUT; + if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; + else + WalSndKeepalive(output_message); /* Determine time until replication timeout */ if (replication_timeout > 0) { - long secs; - int usecs; - - finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, replication_timeout); - TimestampDifference(GetCurrentTimestamp(), - finish_time, &secs, &usecs); - sleeptime = secs * 1000 + usecs / 1000; - /* Avoid Assert in WaitLatchOrSocket if timeout is past */ - if (sleeptime < 0) - sleeptime = 0; - wakeEvents |= WL_TIMEOUT; + sleeptime = 1 + (replication_timeout / 10); } /* Sleep until something happens or replication timeout */ @@ -859,7 +854,7 @@ WalSndLoop(void) * timeout ... he's supposed to reply *before* that. */ if (replication_timeout > 0 && - GetCurrentTimestamp() >= finish_time) + GetCurrentTimestamp() >= timeout) { /* * Since typically expiration of replication timeout means @@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) return (Datum) 0; } +static void +WalSndKeepalive(char *msgbuf) +{ + PrimaryKeepaliveMessage keepalive_message; + + /* Construct a new message */ + keepalive_message.walEnd = sentPtr; + keepalive_message.sendTime = GetCurrentTimestamp(); + + elog(DEBUG2, "sending replication keepalive"); + + /* Prepend with the message type and send it. */ + msgbuf[0] = 'k'; + memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); + pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1); +} + /* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the |
