summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2019-10-19 18:04:02 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2019-10-19 18:04:02 +0100
commitb0b09cbb246ccb61372eca96e276ff6c59f0bbff (patch)
treec36293f06bd763d123f5d9a2da86d7fe33318857
parent4097b4f2a0700041d33d66c874fa527c8b52e077 (diff)
parent63352d7da0cfc0a8e30577a3bb4e1b3504b1e0ab (diff)
downloadpsycopg2-b0b09cbb246ccb61372eca96e276ff6c59f0bbff.tar.gz
Merge branch 'bugfix/940'
-rw-r--r--NEWS2
-rw-r--r--psycopg/pqpath.c4
-rw-r--r--psycopg/replication_cursor.h1
-rw-r--r--psycopg/replication_cursor_type.c3
4 files changed, 9 insertions, 1 deletions
diff --git a/NEWS b/NEWS
index 02cd4d1..2bea4c5 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,8 @@ What's new in psycopg 2.8.4
- Don't swallow keyboard interrupts on connect when a password is specified
in the connection string (:ticket:`#898`).
+- Don't advance replication cursor when the message wasn't confirmed
+ (:ticket:`#940`).
- Fixed int overflow for large values in `~psycopg2.extensions.Column.table_oid`
and `~psycopg2.extensions.Column.type_code` (:ticket:`961`).
- Fixed building with Python 3.8 (:ticket:`854`).
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 414c2d7..28774cd 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1576,7 +1576,9 @@ retry:
/* We can safely forward flush_lsn to the wal_end from the server keepalive message
* if we know that the client already processed (confirmed) the last XLogData message */
- if (repl->flush_lsn >= repl->last_msg_data_start && wal_end > repl->flush_lsn) {
+ if (repl->explicitly_flushed_lsn >= repl->last_msg_data_start
+ && wal_end > repl->explicitly_flushed_lsn
+ && wal_end > repl->flush_lsn) {
repl->flush_lsn = wal_end;
}
diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h
index 1b92b09..ba066b5 100644
--- a/psycopg/replication_cursor.h
+++ b/psycopg/replication_cursor.h
@@ -52,6 +52,7 @@ typedef struct replicationCursorObject {
XLogRecPtr last_msg_data_start; /* WAL pointer to the last non-keepalive message from the server */
struct timeval last_feedback; /* timestamp of the last feedback message to the server */
+ XLogRecPtr explicitly_flushed_lsn; /* the flush LSN explicitly set by the send_feedback call */
} replicationCursorObject;
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
index 5fdeaf0..c1dbd43 100644
--- a/psycopg/replication_cursor_type.c
+++ b/psycopg/replication_cursor_type.c
@@ -211,6 +211,9 @@ send_feedback(replicationCursorObject *self,
if (write_lsn > self->write_lsn)
self->write_lsn = write_lsn;
+ if (flush_lsn > self->explicitly_flushed_lsn)
+ self->explicitly_flushed_lsn = flush_lsn;
+
if (flush_lsn > self->flush_lsn)
self->flush_lsn = flush_lsn;