summaryrefslogtreecommitdiff
path: root/src/backend/commands/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r--src/backend/commands/async.c50
1 files changed, 45 insertions, 5 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 0c295d4fa8..d8a5de6d28 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -198,12 +198,19 @@ typedef struct QueuePosition
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+ (asyncQueuePagePrecedesLogically((x).page, (y).page) ? (y) : \
+ (x).page != (y).page ? (x) : \
+ (x).offset > (y).offset ? (x) : (y))
+
/*
* Struct describing a listening backend's status
*/
typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
+ Oid dboid; /* backend's database OID, or InvalidOid */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
@@ -222,6 +229,7 @@ typedef struct QueueBackendStatus
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
* of other backends and also change the head and tail pointers.
*
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
*
@@ -232,8 +240,8 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the tail
- * of the "slowest" backend */
+ QueuePosition tail; /* the global tail is equivalent to the pos of
+ * the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@@ -244,6 +252,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
+#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
/*
@@ -477,6 +486,7 @@ AsyncShmemInit(void)
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
+ QUEUE_BACKEND_DBOID(i) = InvalidOid;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@@ -929,6 +939,10 @@ AtCommit_Notify(void)
static void
Exec_ListenPreCommit(void)
{
+ QueuePosition head;
+ QueuePosition max;
+ int i;
+
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
@@ -956,10 +970,34 @@ Exec_ListenPreCommit(void)
* over already-committed notifications. This ensures we cannot miss any
* not-yet-committed notifications. We might get a few more but that
* doesn't hurt.
+ *
+ * In some scenarios there might be a lot of committed notifications that
+ * have not yet been pruned away (because some backend is being lazy about
+ * reading them). To reduce our startup time, we can look at other
+ * backends and adopt the maximum "pos" pointer of any backend that's in
+ * our database; any notifications it's already advanced over are surely
+ * committed and need not be re-examined by us. (We must consider only
+ * backends connected to our DB, because others will not have bothered to
+ * check committed-ness of notifications in our DB.) But we only bother
+ * with that if there's more than a page worth of notifications
+ * outstanding, otherwise scanning all the other backends isn't worth it.
+ *
+ * We need exclusive lock here so we can look at other backends' entries.
*/
- LWLockAcquire(AsyncQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ head = QUEUE_HEAD;
+ max = QUEUE_TAIL;
+ if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
+ {
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ }
+ }
+ QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
+ QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
LWLockRelease(AsyncQueueLock);
/* Now we are listed in the global array, so remember we're listening */
@@ -975,7 +1013,8 @@ Exec_ListenPreCommit(void)
*
* This will also advance the global tail pointer if possible.
*/
- asyncQueueReadAllNotifications();
+ if (!QUEUE_POS_EQUAL(max, head))
+ asyncQueueReadAllNotifications();
}
/*
@@ -1178,6 +1217,7 @@ asyncQueueUnregister(void)
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+ QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */