summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/applyparallelworker.c30
-rw-r--r--src/backend/replication/logical/launcher.c24
-rw-r--r--src/include/replication/worker_internal.h2
3 files changed, 38 insertions, 18 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index ee7a18137f..82c1ddcdcb 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
list_length(ParallelApplyWorkerPool) >
(max_parallel_apply_workers_per_subscription / 2))
{
- int slot_no;
- uint16 generation;
-
- SpinLockAcquire(&winfo->shared->mutex);
- generation = winfo->shared->logicalrep_worker_generation;
- slot_no = winfo->shared->logicalrep_worker_slot_no;
- SpinLockRelease(&winfo->shared->mutex);
-
- logicalrep_pa_worker_stop(slot_no, generation);
-
+ logicalrep_pa_worker_stop(winfo);
pa_free_worker_info(winfo);
return;
@@ -636,8 +627,11 @@ pa_detach_all_error_mq(void)
{
ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
- shm_mq_detach(winfo->error_mq_handle);
- winfo->error_mq_handle = NULL;
+ if (winfo->error_mq_handle)
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
}
}
@@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
* Make sure the leader apply worker tries to read from our error queue one more
* time. This guards against the case where we exit uncleanly without sending
* an ErrorResponse, for example because some code calls proc_exit directly.
+ *
+ * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
+ * if any. See ParallelWorkerShutdown for details.
*/
static void
pa_shutdown(int code, Datum arg)
@@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment")));
- before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
-
/* Look up the shared information. */
shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
MyParallelShared = shared;
@@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg)
*/
logicalrep_worker_attach(worker_slot);
+ /*
+ * Register the shutdown callback after we are attached to the worker
+ * slot. This is to ensure that MyLogicalRepWorker remains valid when this
+ * callback is invoked.
+ */
+ before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
+
SpinLockAcquire(&MyParallelShared->mutex);
MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
MyParallelShared->logicalrep_worker_slot_no = worker_slot;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index ceea126231..87b5593d2d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
- * Stop the logical replication parallel apply worker corresponding to the
- * input slot number.
+ * Stop the given logical replication parallel apply worker.
*
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
* worker so that the worker exits cleanly.
*/
void
-logicalrep_pa_worker_stop(int slot_no, uint16 generation)
+logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
{
+ int slot_no;
+ uint16 generation;
LogicalRepWorker *worker;
+ SpinLockAcquire(&winfo->shared->mutex);
+ generation = winfo->shared->logicalrep_worker_generation;
+ slot_no = winfo->shared->logicalrep_worker_slot_no;
+ SpinLockRelease(&winfo->shared->mutex);
+
Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
+ /*
+ * Detach from the error_mq_handle for the parallel apply worker before
+ * stopping it. This prevents the leader apply worker from trying to
+ * receive the message from the error queue that might already be detached
+ * by the parallel apply worker.
+ */
+ if (winfo->error_mq_handle)
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = &LogicalRepCtx->workers[slot_no];
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index b57eed052f..343e781896 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -235,7 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
dsm_handle subworker_dsm);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
+extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);