diff options
-rw-r--r-- | doc/src/sgml/catalogs.sgml | 10 | ||||
-rw-r--r-- | doc/src/sgml/ref/alter_subscription.sgml | 2 | ||||
-rw-r--r-- | doc/src/sgml/ref/create_subscription.sgml | 31 | ||||
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 8 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 54 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 8 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_dump.c | 11 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_dump.h | 1 | ||||
-rw-r--r-- | src/bin/psql/describe.c | 5 | ||||
-rw-r--r-- | src/include/catalog/pg_subscription.h | 8 | ||||
-rw-r--r-- | src/test/regress/expected/subscription.out | 30 | ||||
-rw-r--r-- | src/test/regress/sql/subscription.sql | 4 |
13 files changed, 145 insertions, 33 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 5883673448..5254bb3025 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6531,6 +6531,16 @@ </row> <row> + <entry><structfield>subsynccommit</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + <entry> + Contains the value of the <varname>synchronous_commit</varname> + setting for the subscription workers. + </entry> + </row> + + <row> <entry><structfield>subconninfo</structfield></entry> <entry><type>text</type></entry> <entry></entry> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 640fac0a15..f71ee38b40 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep <phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase> SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable> + | SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable> ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable class="PARAMETER">publication_name</replaceable> [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH } ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) @@ -91,6 +92,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE <varlistentry> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <term><literal>SLOT NAME = <replaceable class="parameter">slot_name</replaceable></literal></term> + <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term> <listitem> <para> These clauses alter properties originally set by diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 3410d6fc8c..3c51012df8 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -32,6 +32,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl | CREATE SLOT | NOCREATE SLOT | SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable> | COPY DATA | NOCOPY DATA + | SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable> | NOCONNECT </synopsis> </refsynopsisdiv> @@ -148,6 +149,36 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl </varlistentry> <varlistentry> + <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term> + <listitem> + <para> + The value of this parameter overrides the + <xref linkend="guc-synchronous-commit"> setting. The default value is + <literal>off</literal>. + </para> + + <para> + It is safe to use <literal>off</literal> for logical replication: If the + subscriber loses transactions because of missing synchronization, the + data will be resent from the publisher. + </para> + + <para> + A different setting might be appropriate when doing synchronous logical + replication. The logical replication workers report the positions of + writes and flushes to the publisher, and when using synchronous + replication, the publisher will wait for the actual flush. This means + that setting <literal>SYNCHRONOUS_COMMIT</literal> for the subscriber + to <literal>off</literal> when the subscription is used for synchronous + replication might increase the latency for <command>COMMIT</command> on + the publisher. In this scenario, it can be advantageous to set + <literal>SYNCHRONOUS_COMMIT</literal> to <literal>local</literal> or + higher. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>NOCONNECT</literal></term> <listitem> <para> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 7e38b1a31c..a18385055e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + /* Get synccommit */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subsynccommit, + &isnull); + Assert(!isnull); + sub->synccommit = TextDatumGetCString(datum); + /* Get publications */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 7b8b11cb81..519c6846e3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,6 +44,7 @@ #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, - bool *copy_data) + bool *copy_data, char **synchronous_commit) { ListCell *lc; bool connect_given = false; @@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *slot_name = NULL; if (copy_data) *copy_data = true; + if (synchronous_commit) + *synchronous_commit = NULL; /* Parse options */ foreach (lc, options) @@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, copy_data_given = true; *copy_data = !defGetBoolean(defel); } + else if (strcmp(defel->defname, "synchronous_commit") == 0 && + synchronous_commit) + { + if (*synchronous_commit) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *synchronous_commit = defGetString(defel); + + /* Test if the given value is valid for synchronous_commit GUC. */ + (void) set_config_option("synchronous_commit", *synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + char *synchronous_commit; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data); + &enabled, &create_slot, &slotname, ©_data, + &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ + if (synchronous_commit == NULL) + synchronous_commit = "off"; conninfo = stmt->conninfo; publications = stmt->publication; @@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CStringGetTextDatum(conninfo); values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slotname)); + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); @@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_OPTIONS: { char *slot_name; + char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL); + NULL, &slot_name, NULL, + &synchronous_commit); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + if (synchronous_commit) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum(synchronous_commit); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } update_tuple = true; break; @@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL); + NULL, NULL, NULL); Assert(enabled_given); values[Anum_pg_subscription_subenabled - 1] = @@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7ba239c02c..2d663f6308 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -129,17 +129,13 @@ get_subscription_list(void) */ oldcxt = MemoryContextSwitchTo(resultcxt); - sub = (Subscription *) palloc(sizeof(Subscription)); + sub = (Subscription *) palloc0(sizeof(Subscription)); sub->oid = HeapTupleGetOid(tup); sub->dbid = subform->subdbid; sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); - /* We don't fill fields we are not interested in. */ - sub->conninfo = NULL; - sub->slotname = NULL; - sub->publications = NIL; res = lappend(res, sub); MemoryContextSwitchTo(oldcxt); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3313448e7b..29b6c6a168 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1416,6 +1416,10 @@ reread_subscription(void) MemoryContextSwitchTo(oldctx); + /* Change synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + if (started_tx) CommitTransactionCommand(); @@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg) MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + if (!MySubscription->enabled) { ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 1029354462..3eccfa626b 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3683,6 +3683,7 @@ getSubscriptions(Archive *fout) int i_rolname; int i_subconninfo; int i_subslotname; + int i_subsynccommit; int i_subpublications; int i, ntups; @@ -3714,7 +3715,8 @@ getSubscriptions(Archive *fout) appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname," "(%s s.subowner) AS rolname, " - " s.subconninfo, s.subslotname, s.subpublications " + " s.subconninfo, s.subslotname, s.subsynccommit, " + " s.subpublications " "FROM pg_catalog.pg_subscription s " "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" " WHERE datname = current_database())", @@ -3729,6 +3731,7 @@ getSubscriptions(Archive *fout) i_rolname = PQfnumber(res, "rolname"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); + i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -3744,6 +3747,8 @@ getSubscriptions(Archive *fout) subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); + subinfo[i].subsynccommit = + pg_strdup(PQgetvalue(res, i, i_subsynccommit)); subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); @@ -3810,6 +3815,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data); appendStringLiteralAH(query, subinfo->subslotname, fout); + + if (strcmp(subinfo->subsynccommit, "off") != 0) + appendPQExpBuffer(query, ", SYNCHRONOUS_COMMIT = %s", fmtId(subinfo->subsynccommit)); + appendPQExpBufferStr(query, ");\n"); appendPQExpBuffer(labelq, "SUBSCRIPTION %s", fmtId(subinfo->dobj.name)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ba85392f11..471cfce92a 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo char *rolname; char *subconninfo; char *subslotname; + char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 2494d046b2..59121b8d1b 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, + false, false}; if (pset.sversion < 100000) { @@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { appendPQExpBuffer(&buf, + ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", + gettext_noop("Synchronous commit"), gettext_noop("Conninfo")); } diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0811880a8f..fae542b612 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -43,7 +43,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE #ifdef CATALOG_VARLEN /* variable-length fields start here */ text subconninfo; /* Connection string to the publisher */ NameData subslotname; /* Slot name on publisher */ - + text subsynccommit; /* Synchronous commit setting for worker */ text subpublications[1]; /* List of publications subscribed to */ #endif } FormData_pg_subscription; @@ -54,14 +54,15 @@ typedef FormData_pg_subscription *Form_pg_subscription; * compiler constants for pg_subscription * ---------------- */ -#define Natts_pg_subscription 7 +#define Natts_pg_subscription 8 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 #define Anum_pg_subscription_subenabled 4 #define Anum_pg_subscription_subconninfo 5 #define Anum_pg_subscription_subslotname 6 -#define Anum_pg_subscription_subpublications 7 +#define Anum_pg_subscription_subsynccommit 7 +#define Anum_pg_subscription_subpublications 8 typedef struct Subscription @@ -73,6 +74,7 @@ typedef struct Subscription bool enabled; /* Indicates if the subscription is enabled */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ + char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ } Subscription; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 8760d5970a..47531edd1b 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -46,10 +46,10 @@ CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WI ERROR: must be superuser to create subscriptions SET SESSION AUTHORIZATION 'regress_subscription_user'; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+-------------+--------------------- - testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous commit | Conninfo +---------+---------------------------+---------+-------------+--------------------+--------------------- + testsub | regress_subscription_user | f | {testpub} | off | dbname=doesnotexist (1 row) ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH; @@ -59,10 +59,10 @@ ALTER SUBSCRIPTION testsub WITH (SLOT NAME = 'newname'); ALTER SUBSCRIPTION doesnotexist CONNECTION 'dbname=doesnotexist2'; ERROR: subscription "doesnotexist" does not exist \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+---------------------+---------------------- - testsub | regress_subscription_user | f | {testpub2,testpub3} | dbname=doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous commit | Conninfo +---------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=doesnotexist2 (1 row) BEGIN; @@ -89,11 +89,15 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; ERROR: must be owner of subscription testsub RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; -\dRs - List of subscriptions - Name | Owner | Enabled | Publication --------------+---------------------------+---------+--------------------- - testsub_foo | regress_subscription_user | f | {testpub2,testpub3} +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local); +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar); +ERROR: invalid value for parameter "synchronous_commit": "foobar" +HINT: Available values: local, remote_write, remote_apply, on, off. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous commit | Conninfo +-------------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | local | dbname=doesnotexist2 (1 row) -- rename back to keep the rest simple diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 7bdc2b3503..1b30d150ce 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -66,8 +66,10 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local); +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar); -\dRs +\dRs+ -- rename back to keep the rest simple ALTER SUBSCRIPTION testsub_foo RENAME TO testsub; |