summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c24
1 files changed, 21 insertions, 3 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index ceea126231..87b5593d2d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
- * Stop the logical replication parallel apply worker corresponding to the
- * input slot number.
+ * Stop the given logical replication parallel apply worker.
*
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
* worker so that the worker exits cleanly.
*/
void
-logicalrep_pa_worker_stop(int slot_no, uint16 generation)
+logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
{
+ int slot_no;
+ uint16 generation;
LogicalRepWorker *worker;
+ SpinLockAcquire(&winfo->shared->mutex);
+ generation = winfo->shared->logicalrep_worker_generation;
+ slot_no = winfo->shared->logicalrep_worker_slot_no;
+ SpinLockRelease(&winfo->shared->mutex);
+
Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
+ /*
+ * Detach from the error_mq_handle for the parallel apply worker before
+ * stopping it. This prevents the leader apply worker from trying to
+ * receive the message from the error queue that might already be detached
+ * by the parallel apply worker.
+ */
+ if (winfo->error_mq_handle)
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = &LogicalRepCtx->workers[slot_no];