summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/applyparallelworker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/applyparallelworker.c')
-rw-r--r--src/backend/replication/logical/applyparallelworker.c30
1 files changed, 16 insertions, 14 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index ee7a18137f..82c1ddcdcb 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
list_length(ParallelApplyWorkerPool) >
(max_parallel_apply_workers_per_subscription / 2))
{
- int slot_no;
- uint16 generation;
-
- SpinLockAcquire(&winfo->shared->mutex);
- generation = winfo->shared->logicalrep_worker_generation;
- slot_no = winfo->shared->logicalrep_worker_slot_no;
- SpinLockRelease(&winfo->shared->mutex);
-
- logicalrep_pa_worker_stop(slot_no, generation);
-
+ logicalrep_pa_worker_stop(winfo);
pa_free_worker_info(winfo);
return;
@@ -636,8 +627,11 @@ pa_detach_all_error_mq(void)
{
ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
- shm_mq_detach(winfo->error_mq_handle);
- winfo->error_mq_handle = NULL;
+ if (winfo->error_mq_handle)
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
}
}
@@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
* Make sure the leader apply worker tries to read from our error queue one more
* time. This guards against the case where we exit uncleanly without sending
* an ErrorResponse, for example because some code calls proc_exit directly.
+ *
+ * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
+ * if any. See ParallelWorkerShutdown for details.
*/
static void
pa_shutdown(int code, Datum arg)
@@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment")));
- before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
-
/* Look up the shared information. */
shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
MyParallelShared = shared;
@@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg)
*/
logicalrep_worker_attach(worker_slot);
+ /*
+ * Register the shutdown callback after we are attached to the worker
+ * slot. This is to ensure that MyLogicalRepWorker remains valid when this
+ * callback is invoked.
+ */
+ before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
+
SpinLockAcquire(&MyParallelShared->mutex);
MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
MyParallelShared->logicalrep_worker_slot_no = worker_slot;