summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-05-09 09:28:06 +0530
committerAmit Kapila <akapila@postgresql.org>2023-05-09 09:28:06 +0530
commit3d144c6c86025272e1711539f5fafb6fb85c4feb (patch)
treee8e64126bdc26dbb43be7d0505118cd2a76d91f5
parent455f948b0d03a556533a7e4a1a8abf45f0eb202e (diff)
downloadpostgresql-3d144c6c86025272e1711539f5fafb6fb85c4feb.tar.gz
Fix invalid memory access during the shutdown of the parallel apply worker.
The callback function pa_shutdown() accesses MyLogicalRepWorker which may not be initialized if there is an error during the initialization of the parallel apply worker. The other problem is that by the time it is invoked even after the initialization of the worker, the MyLogicalRepWorker will be reset by another callback logicalrep_worker_onexit. So, it won't have the required information. To fix this, register the shutdown callback after we are attached to the worker slot. After this fix, we observed another issue which is that sometimes the leader apply worker tries to receive the message from the error queue that might already be detached by the parallel apply worker leading to an error. To prevent such an error, we ensure that the leader apply worker detaches from the parallel apply worker's error queue before stopping it. Reported-by: Sawada Masahiko Author: Hou Zhijie Reviewed-by: Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/CAD21AoDo+yUwNq6nTrvE2h9bB2vZfcag=jxWc7QxuWCmkDAqcA@mail.gmail.com
-rw-r--r--src/backend/replication/logical/applyparallelworker.c30
-rw-r--r--src/backend/replication/logical/launcher.c24
-rw-r--r--src/include/replication/worker_internal.h2
3 files changed, 38 insertions, 18 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index ee7a18137f..82c1ddcdcb 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
list_length(ParallelApplyWorkerPool) >
(max_parallel_apply_workers_per_subscription / 2))
{
- int slot_no;
- uint16 generation;
-
- SpinLockAcquire(&winfo->shared->mutex);
- generation = winfo->shared->logicalrep_worker_generation;
- slot_no = winfo->shared->logicalrep_worker_slot_no;
- SpinLockRelease(&winfo->shared->mutex);
-
- logicalrep_pa_worker_stop(slot_no, generation);
-
+ logicalrep_pa_worker_stop(winfo);
pa_free_worker_info(winfo);
return;
@@ -636,8 +627,11 @@ pa_detach_all_error_mq(void)
{
ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
- shm_mq_detach(winfo->error_mq_handle);
- winfo->error_mq_handle = NULL;
+ if (winfo->error_mq_handle)
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
}
}
@@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
* Make sure the leader apply worker tries to read from our error queue one more
* time. This guards against the case where we exit uncleanly without sending
* an ErrorResponse, for example because some code calls proc_exit directly.
+ *
+ * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
+ * if any. See ParallelWorkerShutdown for details.
*/
static void
pa_shutdown(int code, Datum arg)
@@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment")));
- before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
-
/* Look up the shared information. */
shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
MyParallelShared = shared;
@@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg)
*/
logicalrep_worker_attach(worker_slot);
+ /*
+ * Register the shutdown callback after we are attached to the worker
+ * slot. This is to ensure that MyLogicalRepWorker remains valid when this
+ * callback is invoked.
+ */
+ before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
+
SpinLockAcquire(&MyParallelShared->mutex);
MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
MyParallelShared->logicalrep_worker_slot_no = worker_slot;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index ceea126231..87b5593d2d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
- * Stop the logical replication parallel apply worker corresponding to the
- * input slot number.
+ * Stop the given logical replication parallel apply worker.
*
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
* worker so that the worker exits cleanly.
*/
void
-logicalrep_pa_worker_stop(int slot_no, uint16 generation)
+logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
{
+ int slot_no;
+ uint16 generation;
LogicalRepWorker *worker;
+ SpinLockAcquire(&winfo->shared->mutex);
+ generation = winfo->shared->logicalrep_worker_generation;
+ slot_no = winfo->shared->logicalrep_worker_slot_no;
+ SpinLockRelease(&winfo->shared->mutex);
+
Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
+ /*
+ * Detach from the error_mq_handle for the parallel apply worker before
+ * stopping it. This prevents the leader apply worker from trying to
+ * receive the message from the error queue that might already be detached
+ * by the parallel apply worker.
+ */
+ if (winfo->error_mq_handle)
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = &LogicalRepCtx->workers[slot_no];
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index b57eed052f..343e781896 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -235,7 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
dsm_handle subworker_dsm);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
+extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);