diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2011-03-06 22:49:16 +0000 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2011-03-06 22:49:16 +0000 |
commit | a8a8a3e0965201df88bdfdff08f50e5c06c552b7 (patch) | |
tree | c29687748fa9d5e9bc335e11bf3d8446563184c3 /src | |
parent | 149b2673c244b92b59411dd2292d6ddcfb03d5d4 (diff) | |
download | postgresql-a8a8a3e0965201df88bdfdff08f50e5c06c552b7.tar.gz |
Efficient transaction-controlled synchronous replication.
If a standby is broadcasting reply messages and we have named
one or more standbys in synchronous_standby_names then allow
users who set synchronous_replication to wait for commit, which
then provides strict data integrity guarantees. Design avoids
sending and receiving transaction state information so minimises
bookkeeping overheads. We synchronize with the highest priority
standby that is connected and ready to synchronize. Other standbys
can be defined to takeover in case of standby failure.
This version has very strict behaviour; more relaxed options
may be added at a later date.
Simon Riggs and Fujii Masao, with reviews by Yeb Havinga, Jaime
Casanova, Heikki Linnakangas and Robert Haas, plus the assistance
of many other design reviewers.
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/twophase.c | 25 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 11 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 4 | ||||
-rw-r--r-- | src/backend/postmaster/autovacuum.c | 7 | ||||
-rw-r--r-- | src/backend/postmaster/postmaster.c | 3 | ||||
-rw-r--r-- | src/backend/replication/Makefile | 2 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 9 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 65 | ||||
-rw-r--r-- | src/backend/storage/ipc/shmqueue.c | 21 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 12 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 19 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 11 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.h | 2 | ||||
-rw-r--r-- | src/include/replication/walsender.h | 22 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 1 | ||||
-rw-r--r-- | src/include/storage/proc.h | 14 | ||||
-rw-r--r-- | src/include/storage/shmem.h | 3 | ||||
-rw-r--r-- | src/test/regress/expected/rules.out | 2 |
18 files changed, 213 insertions, 20 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 287ad26698..729c7b72e0 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -56,6 +56,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/fd.h" #include "storage/predicate.h" #include "storage/procarray.h" @@ -1071,6 +1072,14 @@ EndPrepare(GlobalTransaction gxact) END_CRIT_SECTION(); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked the prepare, but still show as + * running in the procarray (twice!) and continue to hold locks. + */ + SyncRepWaitForLSN(gxact->prepare_lsn); + records.tail = records.head = NULL; } @@ -2030,6 +2039,14 @@ RecordTransactionCommitPrepared(TransactionId xid, MyProc->inCommit = false; END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } /* @@ -2109,4 +2126,12 @@ RecordTransactionAbortPrepared(TransactionId xid, TransactionIdAbortTree(xid, nchildren, children); END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 4b407015df..c8b582cce8 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -37,6 +37,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" @@ -1055,7 +1056,7 @@ RecordTransactionCommit(void) * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ - if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0) + if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested()) { /* * Synchronous commit case: @@ -1125,6 +1126,14 @@ RecordTransactionCommit(void) /* Compute latestXid while we have the child XIDs handy */ latestXid = TransactionIdLatest(xid, nchildren, children); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(XactLastRecEnd); + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd.xrecoff = 0; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c7f43afd81..3f7d7d913a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -520,7 +520,9 @@ CREATE VIEW pg_stat_replication AS W.sent_location, W.write_location, W.flush_location, - W.replay_location + W.replay_location, + W.sync_priority, + W.sync_state FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W WHERE S.usesysid = U.oid AND diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 7307c4177c..efc8e7cc82 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -1527,6 +1527,13 @@ AutoVacWorkerMain(int argc, char *argv[]) SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); /* + * Force synchronous replication off to allow regular maintenance even + * if we are waiting for standbys to connect. This is important to + * ensure we aren't blocked from performing anti-wraparound tasks. + */ + SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE); + + /* * Get the info about the database we're going to work on. */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 997af5bf07..372fec7560 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -735,6 +735,9 @@ PostmasterMain(int argc, char *argv[]) if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); + if (strlen(SyncRepStandbyNames) > 0 && max_wal_senders == 0) + ereport(ERROR, + (errmsg("Synchronous replication requires WAL streaming (max_wal_senders > 0)"))); /* * Other one-time internal sanity checks can go here, if they are fast. diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 42c6eaf26c..3fe490e580 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o + repl_gram.o syncrep.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 32a1575ab0..47a980db20 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -317,13 +317,9 @@ WalReceiverMain(void) while (walrcv_receive(0, &type, &buf, &len)) XLogWalRcvProcessMsg(type, buf, len); - /* Let the master know that we received some data. */ - XLogWalRcvSendReply(); - XLogWalRcvSendHSFeedback(); - /* * If we've written some records, flush them to disk and let the - * startup process know about them. + * startup process and primary server know about them. */ XLogWalRcvFlush(false); } @@ -581,7 +577,10 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) + { XLogWalRcvSendReply(); + XLogWalRcvSendHSFeedback(); + } } } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 49b49d2a18..94547245fe 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -66,7 +66,7 @@ WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ -static WalSnd *MyWalSnd = NULL; +WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ @@ -174,6 +174,8 @@ WalSenderMain(void) SpinLockRelease(&walsnd->mutex); } + SyncRepInitConfig(); + /* Main loop of walsender */ return WalSndLoop(); } @@ -584,6 +586,8 @@ ProcessStandbyReplyMessage(void) walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } + + SyncRepReleaseWaiters(); } /* @@ -700,6 +704,7 @@ WalSndLoop(void) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); } /* @@ -771,7 +776,12 @@ WalSndLoop(void) * that point might wait for some time. */ if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup) + { + ereport(DEBUG1, + (errmsg("standby \"%s\" has now caught up with primary", + application_name))); WalSndSetState(WALSNDSTATE_STREAMING); + } ProcessRepliesIfAny(); } @@ -1238,6 +1248,8 @@ WalSndShmemInit(void) /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); + SHMQueueInit(&(WalSndCtl->SyncRepQueue)); + for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; @@ -1304,12 +1316,15 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 6 +#define PG_STAT_GET_WAL_SENDERS_COLS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; + int sync_priority[max_wal_senders]; + int priority = 0; + int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ @@ -1337,6 +1352,33 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); + /* + * Get the priorities of sync standbys all in one go, to minimise + * lock acquisitions and to allow us to evaluate who is the current + * sync standby. This code must match the code in SyncRepReleaseWaiters(). + */ + LWLockAcquire(SyncRepLock, LW_SHARED); + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + if (walsnd->pid != 0) + { + sync_priority[i] = walsnd->sync_standby_priority; + + if (walsnd->state == WALSNDSTATE_STREAMING && + walsnd->sync_standby_priority > 0 && + (priority == 0 || + priority > walsnd->sync_standby_priority)) + { + priority = walsnd->sync_standby_priority; + sync_standby = i; + } + } + } + LWLockRelease(SyncRepLock); + for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ @@ -1370,11 +1412,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * Only superusers can see details. Other users only get * the pid value to know it's a walsender, but no details. */ - nulls[1] = true; - nulls[2] = true; - nulls[3] = true; - nulls[4] = true; - nulls[5] = true; + MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1); } else { @@ -1401,6 +1439,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) snprintf(location, sizeof(location), "%X/%X", apply.xlogid, apply.xrecoff); values[5] = CStringGetTextDatum(location); + + values[6] = Int32GetDatum(sync_priority[i]); + + /* + * More easily understood version of standby state. + * This is purely informational, not different from priority. + */ + if (sync_priority[i] == 0) + values[7] = CStringGetTextDatum("ASYNC"); + else if (i == sync_standby) + values[7] = CStringGetTextDatum("SYNC"); + else + values[7] = CStringGetTextDatum("POTENTIAL"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c index 1cf69a09c8..5d684b2b85 100644 --- a/src/backend/storage/ipc/shmqueue.c +++ b/src/backend/storage/ipc/shmqueue.c @@ -104,7 +104,6 @@ SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem) * element. Inserting "after" the queue head puts the elem * at the head of the queue. */ -#ifdef NOT_USED void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem) { @@ -118,7 +117,6 @@ SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem) queue->next = elem; nextPtr->prev = elem; } -#endif /* NOT_USED */ /*-------------------- * SHMQueueNext -- Get the next element from a queue @@ -156,6 +154,25 @@ SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) return (Pointer) (((char *) elemPtr) - linkOffset); } +/*-------------------- + * SHMQueuePrev -- Get the previous element from a queue + * + * Same as SHMQueueNext, just starting at tail and moving towards head + * All other comments and usage applies. + */ +Pointer +SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) +{ + SHM_QUEUE *elemPtr = curElem->prev; + + Assert(ShmemAddrIsValid(curElem)); + + if (elemPtr == queue) /* back to the queue head? */ + return NULL; + + return (Pointer) (((char *) elemPtr) - linkOffset); +} + /* * SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index afaf5995f0..ee03316050 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -39,6 +39,7 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" @@ -196,6 +197,7 @@ InitProcGlobal(void) PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* @@ -214,6 +216,7 @@ InitProcGlobal(void) PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* @@ -224,6 +227,7 @@ InitProcGlobal(void) { AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */ PGSemaphoreCreate(&(AuxiliaryProcs[i].sem)); + InitSharedLatch(&procs[i].waitLatch); } /* Create ProcStructLock spinlock, too */ @@ -326,6 +330,13 @@ InitProcess(void) SHMQueueInit(&(MyProc->myProcLocks[i])); MyProc->recoveryConflictPending = false; + /* Initialise for sync rep */ + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + MyProc->syncRepState = SYNC_REP_NOT_WAITING; + SHMQueueElemInit(&(MyProc->syncRepLinks)); + OwnLatch((Latch *) &MyProc->waitLatch); + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly @@ -365,6 +376,7 @@ InitProcessPhase2(void) /* * Arrange to clean that up at backend exit. */ + on_shmem_exit(SyncRepCleanupAtProcExit, 0); on_shmem_exit(RemoveProcFromArray, 0); } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 529148a040..0bf1845599 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -55,6 +55,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -754,6 +755,14 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL }, { + {"synchronous_replication", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Requests synchronous replication."), + NULL + }, + &sync_rep_mode, + false, NULL, NULL + }, + { {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS, gettext_noop("Continues processing past damaged page headers."), gettext_noop("Detection of a damaged page header normally causes PostgreSQL to " @@ -2717,6 +2726,16 @@ static struct config_string ConfigureNamesString[] = }, { + {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("List of potential standby names to synchronise with."), + NULL, + GUC_LIST_INPUT + }, + &SyncRepStandbyNames, + "", assign_synchronous_standby_names, NULL + }, + + { {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets default text search configuration."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6bfd0fd87c..ed70223f13 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -184,7 +184,16 @@ #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables -# - Streaming Replication - +# - Replication - User Settings + +#synchronous_replication = off # does commit wait for reply from standby + +# - Streaming Replication - Server Settings + +#synchronous_standby_names = '' # standby servers that provide sync rep + # comma-separated list of application_name from standby(s); + # '*' = all + #max_wal_senders = 0 # max number of walsender processes # (change requires restart) diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 96a463398c..0533e5a686 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2542,7 +2542,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23,25}" "{o,o,o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 5843307c9d..8a8c9398d1 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "nodes/nodes.h" #include "storage/latch.h" +#include "replication/syncrep.h" #include "storage/spin.h" @@ -52,11 +53,32 @@ typedef struct WalSnd * to do. */ Latch latch; + + /* + * The priority order of the standby managed by this WALSender, as + * listed in synchronous_standby_names, or 0 if not-listed. + * Protected by SyncRepLock. + */ + int sync_standby_priority; } WalSnd; +extern WalSnd *MyWalSnd; + /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + /* + * Synchronous replication queue. Protected by SyncRepLock. + */ + SHM_QUEUE SyncRepQueue; + + /* + * Current location of the head of the queue. All waiters should have + * a waitLSN that follows this value, or they are currently being woken + * to remove themselves from the queue. Protected by SyncRepLock. + */ + XLogRecPtr lsn; + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index ad0bcd775b..438a48d8dc 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -78,6 +78,7 @@ typedef enum LWLockId SerializableFinishedListLock, SerializablePredicateLockListLock, OldSerXidLock, + SyncRepLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 78dbadef4c..1d6642c6c7 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -14,6 +14,9 @@ #ifndef _PROC_H_ #define _PROC_H_ +#include "access/xlog.h" +#include "replication/syncrep.h" +#include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" #include "utils/timestamp.h" @@ -116,6 +119,17 @@ struct PGPROC * lock object by this backend */ /* + * Info to allow us to wait for synchronous replication, if needed. + * waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend. + * syncRepState must not be touched except by owning process or WALSender. + * syncRep_links used only while holding SyncRepLock. + */ + Latch waitLatch; /* allow us to wait for sync rep */ + XLogRecPtr waitLSN; /* waiting for this LSN or higher */ + int syncRepState; /* wait state for sync rep */ + SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of * their lock. diff --git a/src/include/storage/shmem.h b/src/include/storage/shmem.h index f23740c9e3..0b7da77ccd 100644 --- a/src/include/storage/shmem.h +++ b/src/include/storage/shmem.h @@ -67,8 +67,11 @@ extern void SHMQueueInit(SHM_QUEUE *queue); extern void SHMQueueElemInit(SHM_QUEUE *queue); extern void SHMQueueDelete(SHM_QUEUE *queue); extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem); +extern void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem); extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset); +extern Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, + Size linkOffset); extern bool SHMQueueEmpty(const SHM_QUEUE *queue); extern bool SHMQueueIsDetached(const SHM_QUEUE *queue); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 02043ab42c..20cdc39752 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1298,7 +1298,7 @@ SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset; pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d; pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; - pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid)); + pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid)); pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text)); pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text)); pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL)); |