diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index ceea126231..87b5593d2d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Stop the logical replication parallel apply worker corresponding to the - * input slot number. + * Stop the given logical replication parallel apply worker. * * Node that the function sends SIGINT instead of SIGTERM to the parallel apply * worker so that the worker exits cleanly. */ void -logicalrep_pa_worker_stop(int slot_no, uint16 generation) +logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) { + int slot_no; + uint16 generation; LogicalRepWorker *worker; + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + Assert(slot_no >= 0 && slot_no < max_logical_replication_workers); + /* + * Detach from the error_mq_handle for the parallel apply worker before + * stopping it. This prevents the leader apply worker from trying to + * receive the message from the error queue that might already be detached + * by the parallel apply worker. + */ + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = &LogicalRepCtx->workers[slot_no]; |