summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-05-03 10:13:13 +0530
committerAmit Kapila <akapila@postgresql.org>2023-05-03 10:17:49 +0530
commitde63f8dadee4afa152ce177fd3c562d47373a728 (patch)
tree0d0dc49ae8ee5dded48eeb1e1594f2ceb2c708f4
parent6489875ce6b16662142bc70e003437b9753c199f (diff)
downloadpostgresql-de63f8dadee4afa152ce177fd3c562d47373a728.tar.gz
Fix assertion failure in apply worker.
During exit, the logical replication apply worker tries to release session level locks, if any. However, if the apply worker exits due to an error before its connection is initialized, trying to release locks can lead to assertion failure. The locks will be acquired once the worker is initialized, so we don't need to release them till the worker initialization is complete. Reported-by: Alexander Lakhin Author: Hou Zhijie based on inputs from Sawada Masahiko and Amit Kapila Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/2185d65f-5aae-3efa-c48f-fb42b173ef5c@gmail.com
-rw-r--r--src/backend/replication/logical/applyparallelworker.c4
-rw-r--r--src/backend/replication/logical/launcher.c5
-rw-r--r--src/backend/replication/logical/worker.c7
-rw-r--r--src/include/replication/worker_internal.h2
4 files changed, 17 insertions, 1 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 4518683779..ee7a18137f 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -873,6 +873,8 @@ ParallelApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg);
char originname[NAMEDATALEN];
+ InitializingApplyWorker = true;
+
/* Setup signal handling. */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
@@ -940,6 +942,8 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializeApplyWorker();
+ InitializingApplyWorker = false;
+
/* Setup replication origin tracking. */
StartTransactionCommand();
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 970d170e73..ceea126231 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -797,8 +797,11 @@ logicalrep_worker_onexit(int code, Datum arg)
* Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker
* terminates, so manually release all locks before the worker exits.
+ *
+ * The locks will be acquired once the worker is initialized.
*/
- LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+ if (!InitializingApplyWorker)
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);
ApplyLauncherWakeup();
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dbf88c9553..879309b316 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -331,6 +331,9 @@ static TransactionId stream_xid = InvalidTransactionId;
*/
static uint32 parallel_stream_nchanges = 0;
+/* Are we initializing a apply worker? */
+bool InitializingApplyWorker = false;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -4526,6 +4529,8 @@ ApplyWorkerMain(Datum main_arg)
WalRcvStreamOptions options;
int server_version;
+ InitializingApplyWorker = true;
+
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
@@ -4548,6 +4553,8 @@ ApplyWorkerMain(Datum main_arg)
InitializeApplyWorker();
+ InitializingApplyWorker = false;
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dce71d2c50..b57eed052f 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,8 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
extern PGDLLIMPORT bool in_remote_transaction;
+extern PGDLLIMPORT bool InitializingApplyWorker;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);