summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Dunstan <andrew@dunslane.net>2017-12-14 11:32:25 -0500
committerAndrew Dunstan <andrew@dunslane.net>2017-12-14 11:32:25 -0500
commitf5c7e0cddf6a970012bf4cfd18f00174f4d585b3 (patch)
tree8d949652e5a79d1f7e9216990a845f2ee0c7716d
parent239b01e313b5c3bc90d383b0a345e0549fe9ba96 (diff)
downloadpostgresql-f5c7e0cddf6a970012bf4cfd18f00174f4d585b3.tar.gz
Fix walsender timeouts when decoding a large transaction
The logical slots have a fast code path for sending data so as not to impose too high a per message overhead. The fast path skips checks for interrupts and timeouts. However, the existing coding failed to consider the fact that a transaction with a large number of changes may take a very long time to be processed and sent to the client. This causes the walsender to ignore interrupts for potentially a long time and more importantly it will result in the walsender being killed due to timeout at the end of such a transaction. This commit changes the fast path to also check for interrupts and only allows calling the fast path when the last keepalive check happened less than half the walsender timeout ago. Otherwise the slower code path will be taken. Backpatched to 9.4 Petr Jelinek, reviewed by Kyotaro HORIGUCHI, Yura Sokolov, Craig Ringer and Robert Haas. Discussion: https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
-rw-r--r--src/backend/replication/walsender.c67
1 files changed, 37 insertions, 30 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b24f9a1e95..3a06a4a307 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1066,6 +1066,9 @@ static void
WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool last_write)
{
+ TimestampTz now;
+ int64 now_int;
+
/* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
@@ -1075,53 +1078,35 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
* several releases by streaming physical replication.
*/
resetStringInfo(&tmpbuf);
- pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+ now_int = GetCurrentIntegerTimestamp();
+ now = IntegerTimestampToTimestampTz(now_int);
+ pq_sendint64(&tmpbuf, now_int);
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
- /* fast path */
+ CHECK_FOR_INTERRUPTS();
+
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
- if (!pq_is_send_pending())
+ /* Try taking fast path unless we get too close to walsender timeout. */
+ if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2) &&
+ !pq_is_send_pending())
+ {
return;
+ }
+ /* If we have pending write here, go to slow path */
for (;;)
{
int wakeEvents;
long sleeptime;
- TimestampTz now;
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive())
- exit(1);
-
- /* Process any requests or signals received recently */
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- SyncRepInitConfig();
- }
/* Check for input from the client */
ProcessRepliesIfAny();
- /* Clear any already-pending wakeups */
- ResetLatch(&MyWalSnd->latch);
-
- /* Try to flush pending output to the client */
- if (pq_flush_if_writable() != 0)
- WalSndShutdown();
-
- /* If we finished clearing the buffered data, we're done here. */
- if (!pq_is_send_pending())
- break;
-
now = GetCurrentTimestamp();
/* die if timeout was reached */
@@ -1130,6 +1115,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary(now);
+ if (!pq_is_send_pending())
+ break;
+
sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1141,6 +1129,25 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
MyProcPort->sock, sleeptime);
ImmediateInterruptOK = false;
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /* Process any requests or signals received recently */
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
+ }
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
}
/* reactivate latch so WalSndLoop knows to continue */