summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/catalogs.sgml10
-rw-r--r--doc/src/sgml/ref/alter_subscription.sgml2
-rw-r--r--doc/src/sgml/ref/create_subscription.sgml31
-rw-r--r--src/backend/catalog/pg_subscription.c8
-rw-r--r--src/backend/commands/subscriptioncmds.c54
-rw-r--r--src/backend/replication/logical/launcher.c6
-rw-r--r--src/backend/replication/logical/worker.c8
-rw-r--r--src/bin/pg_dump/pg_dump.c11
-rw-r--r--src/bin/pg_dump/pg_dump.h1
-rw-r--r--src/bin/psql/describe.c5
-rw-r--r--src/include/catalog/pg_subscription.h8
-rw-r--r--src/test/regress/expected/subscription.out30
-rw-r--r--src/test/regress/sql/subscription.sql4
13 files changed, 145 insertions, 33 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 5883673448..5254bb3025 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6531,6 +6531,16 @@
</row>
<row>
+ <entry><structfield>subsynccommit</structfield></entry>
+ <entry><type>text</type></entry>
+ <entry></entry>
+ <entry>
+ Contains the value of the <varname>synchronous_commit</varname>
+ setting for the subscription workers.
+ </entry>
+ </row>
+
+ <row>
<entry><structfield>subconninfo</structfield></entry>
<entry><type>text</type></entry>
<entry></entry>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 640fac0a15..f71ee38b40 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep
<phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase>
SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
+ | SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable class="PARAMETER">publication_name</replaceable> [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH }
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )
@@ -91,6 +92,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
<varlistentry>
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<term><literal>SLOT NAME = <replaceable class="parameter">slot_name</replaceable></literal></term>
+ <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
<listitem>
<para>
These clauses alter properties originally set by
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 3410d6fc8c..3c51012df8 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -32,6 +32,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
| CREATE SLOT | NOCREATE SLOT
| SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
| COPY DATA | NOCOPY DATA
+ | SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
| NOCONNECT
</synopsis>
</refsynopsisdiv>
@@ -148,6 +149,36 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
</varlistentry>
<varlistentry>
+ <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
+ <listitem>
+ <para>
+ The value of this parameter overrides the
+ <xref linkend="guc-synchronous-commit"> setting. The default value is
+ <literal>off</literal>.
+ </para>
+
+ <para>
+ It is safe to use <literal>off</literal> for logical replication: If the
+ subscriber loses transactions because of missing synchronization, the
+ data will be resent from the publisher.
+ </para>
+
+ <para>
+ A different setting might be appropriate when doing synchronous logical
+ replication. The logical replication workers report the positions of
+ writes and flushes to the publisher, and when using synchronous
+ replication, the publisher will wait for the actual flush. This means
+ that setting <literal>SYNCHRONOUS_COMMIT</literal> for the subscriber
+ to <literal>off</literal> when the subscription is used for synchronous
+ replication might increase the latency for <command>COMMIT</command> on
+ the publisher. In this scenario, it can be advantageous to set
+ <literal>SYNCHRONOUS_COMMIT</literal> to <literal>local</literal> or
+ higher.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><literal>NOCONNECT</literal></term>
<listitem>
<para>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 7e38b1a31c..a18385055e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok)
Assert(!isnull);
sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
+ /* Get synccommit */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subsynccommit,
+ &isnull);
+ Assert(!isnull);
+ sub->synccommit = TextDatumGetCString(datum);
+
/* Get publications */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
tup,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7b8b11cb81..519c6846e3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -44,6 +44,7 @@
#include "storage/lmgr.h"
#include "utils/builtins.h"
+#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void
parse_subscription_options(List *options, bool *connect, bool *enabled_given,
bool *enabled, bool *create_slot, char **slot_name,
- bool *copy_data)
+ bool *copy_data, char **synchronous_commit)
{
ListCell *lc;
bool connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
*slot_name = NULL;
if (copy_data)
*copy_data = true;
+ if (synchronous_commit)
+ *synchronous_commit = NULL;
/* Parse options */
foreach (lc, options)
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
copy_data_given = true;
*copy_data = !defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
+ synchronous_commit)
+ {
+ if (*synchronous_commit)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ *synchronous_commit = defGetString(defel);
+
+ /* Test if the given value is valid for synchronous_commit GUC. */
+ (void) set_config_option("synchronous_commit", *synchronous_commit,
+ PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+ false, 0, false);
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool enabled_given;
bool enabled;
bool copy_data;
+ char *synchronous_commit;
char *conninfo;
char *slotname;
char originname[NAMEDATALEN];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, &connect, &enabled_given,
- &enabled, &create_slot, &slotname, &copy_data);
+ &enabled, &create_slot, &slotname, &copy_data,
+ &synchronous_commit);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
if (slotname == NULL)
slotname = stmt->subname;
+ /* The default for synchronous_commit of subscriptions is off. */
+ if (synchronous_commit == NULL)
+ synchronous_commit = "off";
conninfo = stmt->conninfo;
publications = stmt->publication;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CStringGetTextDatum(conninfo);
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ values[Anum_pg_subscription_subsynccommit - 1] =
+ CStringGetTextDatum(synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_OPTIONS:
{
char *slot_name;
+ char *synchronous_commit;
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, &slot_name, NULL);
+ NULL, &slot_name, NULL,
+ &synchronous_commit);
- values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slot_name));
- replaces[Anum_pg_subscription_subslotname - 1] = true;
+ if (slot_name)
+ {
+ values[Anum_pg_subscription_subslotname - 1] =
+ DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+ replaces[Anum_pg_subscription_subslotname - 1] = true;
+ }
+ if (synchronous_commit)
+ {
+ values[Anum_pg_subscription_subsynccommit - 1] =
+ CStringGetTextDatum(synchronous_commit);
+ replaces[Anum_pg_subscription_subsynccommit - 1] = true;
+ }
update_tuple = true;
break;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
parse_subscription_options(stmt->options, NULL,
&enabled_given, &enabled, NULL,
- NULL, NULL);
+ NULL, NULL, NULL);
Assert(enabled_given);
values[Anum_pg_subscription_subenabled - 1] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
Subscription *sub = GetSubscription(subid, false);
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, &copy_data);
+ NULL, NULL, &copy_data, NULL);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
Subscription *sub = GetSubscription(subid, false);
parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, &copy_data);
+ NULL, NULL, &copy_data, NULL);
AlterSubscription_refresh(sub, copy_data);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7ba239c02c..2d663f6308 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -129,17 +129,13 @@ get_subscription_list(void)
*/
oldcxt = MemoryContextSwitchTo(resultcxt);
- sub = (Subscription *) palloc(sizeof(Subscription));
+ sub = (Subscription *) palloc0(sizeof(Subscription));
sub->oid = HeapTupleGetOid(tup);
sub->dbid = subform->subdbid;
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
-
/* We don't fill fields we are not interested in. */
- sub->conninfo = NULL;
- sub->slotname = NULL;
- sub->publications = NIL;
res = lappend(res, sub);
MemoryContextSwitchTo(oldcxt);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3313448e7b..29b6c6a168 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1416,6 +1416,10 @@ reread_subscription(void)
MemoryContextSwitchTo(oldctx);
+ /* Change synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
if (started_tx)
CommitTransactionCommand();
@@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg)
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
if (!MySubscription->enabled)
{
ereport(LOG,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1029354462..3eccfa626b 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3683,6 +3683,7 @@ getSubscriptions(Archive *fout)
int i_rolname;
int i_subconninfo;
int i_subslotname;
+ int i_subsynccommit;
int i_subpublications;
int i,
ntups;
@@ -3714,7 +3715,8 @@ getSubscriptions(Archive *fout)
appendPQExpBuffer(query,
"SELECT s.tableoid, s.oid, s.subname,"
"(%s s.subowner) AS rolname, "
- " s.subconninfo, s.subslotname, s.subpublications "
+ " s.subconninfo, s.subslotname, s.subsynccommit, "
+ " s.subpublications "
"FROM pg_catalog.pg_subscription s "
"WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
" WHERE datname = current_database())",
@@ -3729,6 +3731,7 @@ getSubscriptions(Archive *fout)
i_rolname = PQfnumber(res, "rolname");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
+ i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -3744,6 +3747,8 @@ getSubscriptions(Archive *fout)
subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
+ subinfo[i].subsynccommit =
+ pg_strdup(PQgetvalue(res, i, i_subsynccommit));
subinfo[i].subpublications =
pg_strdup(PQgetvalue(res, i, i_subpublications));
@@ -3810,6 +3815,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data);
appendStringLiteralAH(query, subinfo->subslotname, fout);
+
+ if (strcmp(subinfo->subsynccommit, "off") != 0)
+ appendPQExpBuffer(query, ", SYNCHRONOUS_COMMIT = %s", fmtId(subinfo->subsynccommit));
+
appendPQExpBufferStr(query, ");\n");
appendPQExpBuffer(labelq, "SUBSCRIPTION %s", fmtId(subinfo->dobj.name));
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ba85392f11..471cfce92a 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo
char *rolname;
char *subconninfo;
char *subslotname;
+ char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 2494d046b2..59121b8d1b 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose)
PQExpBufferData buf;
PGresult *res;
printQueryOpt myopt = pset.popt;
- static const bool translate_columns[] = {false, false, false, false, false};
+ static const bool translate_columns[] = {false, false, false, false,
+ false, false};
if (pset.sversion < 100000)
{
@@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose)
if (verbose)
{
appendPQExpBuffer(&buf,
+ ", subsynccommit AS \"%s\"\n"
", subconninfo AS \"%s\"\n",
+ gettext_noop("Synchronous commit"),
gettext_noop("Conninfo"));
}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0811880a8f..fae542b612 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -43,7 +43,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text subconninfo; /* Connection string to the publisher */
NameData subslotname; /* Slot name on publisher */
-
+ text subsynccommit; /* Synchronous commit setting for worker */
text subpublications[1]; /* List of publications subscribed to */
#endif
} FormData_pg_subscription;
@@ -54,14 +54,15 @@ typedef FormData_pg_subscription *Form_pg_subscription;
* compiler constants for pg_subscription
* ----------------
*/
-#define Natts_pg_subscription 7
+#define Natts_pg_subscription 8
#define Anum_pg_subscription_subdbid 1
#define Anum_pg_subscription_subname 2
#define Anum_pg_subscription_subowner 3
#define Anum_pg_subscription_subenabled 4
#define Anum_pg_subscription_subconninfo 5
#define Anum_pg_subscription_subslotname 6
-#define Anum_pg_subscription_subpublications 7
+#define Anum_pg_subscription_subsynccommit 7
+#define Anum_pg_subscription_subpublications 8
typedef struct Subscription
@@ -73,6 +74,7 @@ typedef struct Subscription
bool enabled; /* Indicates if the subscription is enabled */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
+ char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
} Subscription;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 8760d5970a..47531edd1b 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -46,10 +46,10 @@ CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WI
ERROR: must be superuser to create subscriptions
SET SESSION AUTHORIZATION 'regress_subscription_user';
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Conninfo
----------+---------------------------+---------+-------------+---------------------
- testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
+---------+---------------------------+---------+-------------+--------------------+---------------------
+ testsub | regress_subscription_user | f | {testpub} | off | dbname=doesnotexist
(1 row)
ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH;
@@ -59,10 +59,10 @@ ALTER SUBSCRIPTION testsub WITH (SLOT NAME = 'newname');
ALTER SUBSCRIPTION doesnotexist CONNECTION 'dbname=doesnotexist2';
ERROR: subscription "doesnotexist" does not exist
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Conninfo
----------+---------------------------+---------+---------------------+----------------------
- testsub | regress_subscription_user | f | {testpub2,testpub3} | dbname=doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
+---------+---------------------------+---------+---------------------+--------------------+----------------------
+ testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=doesnotexist2
(1 row)
BEGIN;
@@ -89,11 +89,15 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy;
ERROR: must be owner of subscription testsub
RESET ROLE;
ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
-\dRs
- List of subscriptions
- Name | Owner | Enabled | Publication
--------------+---------------------------+---------+---------------------
- testsub_foo | regress_subscription_user | f | {testpub2,testpub3}
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local);
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar);
+ERROR: invalid value for parameter "synchronous_commit": "foobar"
+HINT: Available values: local, remote_write, remote_apply, on, off.
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
+-------------+---------------------------+---------+---------------------+--------------------+----------------------
+ testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | local | dbname=doesnotexist2
(1 row)
-- rename back to keep the rest simple
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7bdc2b3503..1b30d150ce 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -66,8 +66,10 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy;
RESET ROLE;
ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local);
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar);
-\dRs
+\dRs+
-- rename back to keep the rest simple
ALTER SUBSCRIPTION testsub_foo RENAME TO testsub;