summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/basebackup.c32
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c100
-rw-r--r--src/backend/replication/logical/launcher.c107
-rw-r--r--src/backend/replication/logical/logical.c8
-rw-r--r--src/backend/replication/logical/logicalfuncs.c18
-rw-r--r--src/backend/replication/logical/proto.c38
-rw-r--r--src/backend/replication/logical/relation.c71
-rw-r--r--src/backend/replication/logical/snapbuild.c65
-rw-r--r--src/backend/replication/logical/tablesync.c145
-rw-r--r--src/backend/replication/logical/worker.c327
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c120
-rw-r--r--src/backend/replication/slot.c8
-rw-r--r--src/backend/replication/slotfuncs.c14
-rw-r--r--src/backend/replication/syncrep.c74
-rw-r--r--src/backend/replication/walreceiver.c17
-rw-r--r--src/backend/replication/walsender.c95
16 files changed, 627 insertions, 612 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 3ee0dd5aa4..cb5f58b6ba 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -58,8 +58,8 @@ static bool sendFile(char *readfilename, char *tarfilename,
static void sendFileWithContent(const char *filename, const char *content);
static int64 _tarWriteHeader(const char *filename, const char *linktarget,
struct stat * statbuf, bool sizeonly);
-static int64 _tarWriteDir(const char *pathbuf, int basepathlen, struct stat *statbuf,
- bool sizeonly);
+static int64 _tarWriteDir(const char *pathbuf, int basepathlen, struct stat * statbuf,
+ bool sizeonly);
static void send_int8_string(StringInfoData *buf, int64 intval);
static void SendBackupHeader(List *tablespaces);
static void base_backup_cleanup(int code, Datum arg);
@@ -106,15 +106,15 @@ static const char *excludeDirContents[] =
{
/*
* Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped even
- * when stats_temp_directory is set because PGSS_TEXT_FILE is always created
- * there.
+ * when stats_temp_directory is set because PGSS_TEXT_FILE is always
+ * created there.
*/
PG_STAT_TMP_DIR,
/*
- * It is generally not useful to backup the contents of this directory even
- * if the intention is to restore to another master. See backup.sgml for a
- * more detailed description.
+ * It is generally not useful to backup the contents of this directory
+ * even if the intention is to restore to another master. See backup.sgml
+ * for a more detailed description.
*/
"pg_replslot",
@@ -365,7 +365,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
dir = AllocateDir("pg_wal");
if (!dir)
ereport(ERROR,
- (errmsg("could not open directory \"%s\": %m", "pg_wal")));
+ (errmsg("could not open directory \"%s\": %m", "pg_wal")));
while ((de = ReadDir(dir, "pg_wal")) != NULL)
{
/* Does it look like a WAL segment, and is it in the range? */
@@ -404,8 +404,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
qsort(walFiles, nWalFiles, sizeof(char *), compareWalFileNames);
/*
- * There must be at least one xlog file in the pg_wal directory,
- * since we are doing backup-including-xlog.
+ * There must be at least one xlog file in the pg_wal directory, since
+ * we are doing backup-including-xlog.
*/
if (nWalFiles < 1)
ereport(ERROR,
@@ -1036,7 +1036,7 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
- size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
+ size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
excludeFound = true;
break;
}
@@ -1281,7 +1281,7 @@ _tarWriteHeader(const char *filename, const char *linktarget,
if (!sizeonly)
{
rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size,
- statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
+ statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
statbuf->st_mtime);
switch (rc)
@@ -1295,9 +1295,9 @@ _tarWriteHeader(const char *filename, const char *linktarget,
break;
case TAR_SYMLINK_TOO_LONG:
ereport(ERROR,
- (errmsg("symbolic link target too long for tar format: "
- "file name \"%s\", target \"%s\"",
- filename, linktarget)));
+ (errmsg("symbolic link target too long for tar format: "
+ "file name \"%s\", target \"%s\"",
+ filename, linktarget)));
break;
default:
elog(ERROR, "unrecognized tar error: %d", rc);
@@ -1314,7 +1314,7 @@ _tarWriteHeader(const char *filename, const char *linktarget,
* write it as a directory anyway.
*/
static int64
-_tarWriteDir(const char *pathbuf, int basepathlen, struct stat *statbuf,
+_tarWriteDir(const char *pathbuf, int basepathlen, struct stat * statbuf,
bool sizeonly)
{
/* If symlink, write it as a directory anyway */
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9d7bb25d39..ebe9c91e98 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -40,42 +40,42 @@ void _PG_init(void);
struct WalReceiverConn
{
/* Current connection to the primary, if any */
- PGconn *streamConn;
+ PGconn *streamConn;
/* Used to remember if the connection is logical or physical */
- bool logical;
+ bool logical;
/* Buffer for currently read records */
- char *recvBuf;
+ char *recvBuf;
};
/* Prototypes for interface functions */
static WalReceiverConn *libpqrcv_connect(const char *conninfo,
- bool logical, const char *appname,
- char **err);
+ bool logical, const char *appname,
+ char **err);
static void libpqrcv_check_conninfo(const char *conninfo);
static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
- TimeLineID *primary_tli,
- int *server_version);
+ TimeLineID *primary_tli,
+ int *server_version);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
char **content, int *len);
static bool libpqrcv_startstreaming(WalReceiverConn *conn,
- const WalRcvStreamOptions *options);
+ const WalRcvStreamOptions *options);
static void libpqrcv_endstreaming(WalReceiverConn *conn,
- TimeLineID *next_tli);
-static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
- pgsocket *wait_fd);
+ TimeLineID *next_tli);
+static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
+ pgsocket *wait_fd);
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
- int nbytes);
+ int nbytes);
static char *libpqrcv_create_slot(WalReceiverConn *conn,
- const char *slotname,
- bool temporary,
- CRSSnapshotAction snapshot_action,
- XLogRecPtr *lsn);
+ const char *slotname,
+ bool temporary,
+ CRSSnapshotAction snapshot_action,
+ XLogRecPtr *lsn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
- const char *query,
- const int nRetTypes,
- const Oid *retTypes);
+ const char *query,
+ const int nRetTypes,
+ const Oid *retTypes);
static void libpqrcv_disconnect(WalReceiverConn *conn);
static WalReceiverFunctionsType PQWalReceiverFunctions = {
@@ -153,7 +153,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
conn = palloc0(sizeof(WalReceiverConn));
conn->streamConn = PQconnectStartParams(keys, vals,
- /* expand_dbname = */ true);
+ /* expand_dbname = */ true);
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
{
*err = pchomp(PQerrorMessage(conn->streamConn));
@@ -216,8 +216,8 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
static void
libpqrcv_check_conninfo(const char *conninfo)
{
- PQconninfoOption *opts = NULL;
- char *err = NULL;
+ PQconninfoOption *opts = NULL;
+ char *err = NULL;
opts = PQconninfoParse(conninfo, &err);
if (opts == NULL)
@@ -362,9 +362,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
*/
if (options->logical)
{
- char *pubnames_str;
- List *pubnames;
- char *pubnames_literal;
+ char *pubnames_str;
+ List *pubnames;
+ char *pubnames_literal;
appendStringInfoString(&cmd, " (");
@@ -435,8 +435,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
* next timeline's ID, or just CommandComplete if the server was shut
* down.
*
- * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT
- * is also possible in case we aborted the copy in mid-stream.
+ * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
+ * also possible in case we aborted the copy in mid-stream.
*/
res = PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
@@ -545,9 +545,9 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
/*
* PQexec() silently discards any prior query results on the connection.
- * This is not required for this function as it's expected that the
- * caller (which is this library in all cases) will behave correctly and
- * we don't have to be backwards compatible with old libpq.
+ * This is not required for this function as it's expected that the caller
+ * (which is this library in all cases) will behave correctly and we don't
+ * have to be backwards compatible with old libpq.
*/
/*
@@ -737,9 +737,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn)
{
- PGresult *res;
- StringInfoData cmd;
- char *snapshot;
+ PGresult *res;
+ StringInfoData cmd;
+ char *snapshot;
initStringInfo(&cmd);
@@ -777,7 +777,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
}
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
- CStringGetDatum(PQgetvalue(res, 0, 1))));
+ CStringGetDatum(PQgetvalue(res, 0, 1))));
if (!PQgetisnull(res, 0, 2))
snapshot = pstrdup(PQgetvalue(res, 0, 2));
else
@@ -793,15 +793,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
*/
static void
libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
- const int nRetTypes, const Oid *retTypes)
+ const int nRetTypes, const Oid *retTypes)
{
- int tupn;
- int coln;
- int nfields = PQnfields(pgres);
- HeapTuple tuple;
- AttInMetadata *attinmeta;
- MemoryContext rowcontext;
- MemoryContext oldcontext;
+ int tupn;
+ int coln;
+ int nfields = PQnfields(pgres);
+ HeapTuple tuple;
+ AttInMetadata *attinmeta;
+ MemoryContext rowcontext;
+ MemoryContext oldcontext;
/* Make sure we got expected number of fields. */
if (nfields != nRetTypes)
@@ -832,7 +832,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
/* Process returned rows. */
for (tupn = 0; tupn < PQntuples(pgres); tupn++)
{
- char *cstrs[MaxTupleAttributeNumber];
+ char *cstrs[MaxTupleAttributeNumber];
CHECK_FOR_INTERRUPTS();
@@ -877,7 +877,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("the query interface requires a database connection")));
+ errmsg("the query interface requires a database connection")));
pgres = libpqrcv_PQexec(conn->streamConn, query);
@@ -905,7 +905,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
walres->status = WALRCV_OK_COMMAND;
break;
- /* Empty query is considered error. */
+ /* Empty query is considered error. */
case PGRES_EMPTY_QUERY:
walres->status = WALRCV_ERROR;
walres->err = _("empty query");
@@ -935,16 +935,16 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
static char *
stringlist_to_identifierstr(PGconn *conn, List *strings)
{
- ListCell *lc;
+ ListCell *lc;
StringInfoData res;
- bool first = true;
+ bool first = true;
initStringInfo(&res);
- foreach (lc, strings)
+ foreach(lc, strings)
{
- char *val = strVal(lfirst(lc));
- char *val_escaped;
+ char *val = strVal(lfirst(lc));
+ char *val_escaped;
if (first)
first = false;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 09c87d7c53..4e2c350dc7 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -57,8 +57,8 @@
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
-int max_logical_replication_workers = 4;
-int max_sync_workers_per_subscription = 2;
+int max_logical_replication_workers = 4;
+int max_sync_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -68,7 +68,7 @@ typedef struct LogicalRepCtxStruct
pid_t launcher_pid;
/* Background workers. */
- LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
+ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
} LogicalRepCtxStruct;
LogicalRepCtxStruct *LogicalRepCtx;
@@ -83,9 +83,9 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t got_SIGTERM = false;
-static bool on_commit_launcher_wakeup = false;
+static bool on_commit_launcher_wakeup = false;
-Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
+Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
/*
@@ -122,8 +122,8 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
- Subscription *sub;
- MemoryContext oldcxt;
+ Subscription *sub;
+ MemoryContext oldcxt;
/*
* Allocate our results in the caller's context, not the
@@ -224,15 +224,16 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
- int i;
- LogicalRepWorker *res = NULL;
+ int i;
+ LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@@ -251,17 +252,17 @@ void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid)
{
- BackgroundWorker bgw;
+ BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
- int i;
- int slot = 0;
- LogicalRepWorker *worker = NULL;
- int nsyncworkers;
- TimestampTz now;
+ int i;
+ int slot = 0;
+ LogicalRepWorker *worker = NULL;
+ int nsyncworkers;
+ TimestampTz now;
ereport(LOG,
- (errmsg("starting logical replication worker for subscription \"%s\"",
- subname)));
+ (errmsg("starting logical replication worker for subscription \"%s\"",
+ subname)));
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -300,7 +301,7 @@ retry:
*/
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
{
- bool did_cleanup = false;
+ bool did_cleanup = false;
for (i = 0; i < max_logical_replication_workers; i++)
{
@@ -373,7 +374,7 @@ retry:
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
- bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
@@ -394,7 +395,7 @@ retry:
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
- errhint("You might need to increase max_worker_processes.")));
+ errhint("You might need to increase max_worker_processes.")));
return;
}
@@ -410,7 +411,7 @@ void
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
- uint16 generation;
+ uint16 generation;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -435,7 +436,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
*/
while (worker->in_use && !worker->proc)
{
- int rc;
+ int rc;
LWLockRelease(LogicalRepWorkerLock);
@@ -478,7 +479,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
/* ... and wait for it to die. */
for (;;)
{
- int rc;
+ int rc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
if (!worker->proc || worker->generation != generation)
@@ -509,7 +510,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
void
logicalrep_worker_wakeup(Oid subid, Oid relid)
{
- LogicalRepWorker *worker;
+ LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, true);
@@ -544,18 +545,18 @@ logicalrep_worker_attach(int slot)
{
LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication worker slot %d is empty, cannot attach",
- slot)));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication worker slot %d is empty, cannot attach",
+ slot)));
}
if (MyLogicalRepWorker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication worker slot %d is already used by "
- "another worker, cannot attach", slot)));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication worker slot %d is already used by "
+ "another worker, cannot attach", slot)));
}
MyLogicalRepWorker->proc = MyProc;
@@ -620,7 +621,7 @@ logicalrep_worker_onexit(int code, Datum arg)
void
logicalrep_worker_sigterm(SIGNAL_ARGS)
{
- int save_errno = errno;
+ int save_errno = errno;
got_SIGTERM = true;
@@ -634,7 +635,7 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
void
logicalrep_worker_sighup(SIGNAL_ARGS)
{
- int save_errno = errno;
+ int save_errno = errno;
got_SIGHUP = true;
@@ -651,15 +652,16 @@ logicalrep_worker_sighup(SIGNAL_ARGS)
int
logicalrep_sync_worker_count(Oid subid)
{
- int i;
- int res = 0;
+ int i;
+ int res = 0;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
if (w->subid == subid && OidIsValid(w->relid))
res++;
}
@@ -699,7 +701,7 @@ ApplyLauncherRegister(void)
return;
memset(&bgw, 0, sizeof(bgw));
- bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
@@ -729,7 +731,7 @@ ApplyLauncherShmemInit(void)
if (!found)
{
- int slot;
+ int slot;
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
@@ -783,7 +785,7 @@ ApplyLauncherWakeup(void)
void
ApplyLauncherMain(Datum main_arg)
{
- TimestampTz last_start_time = 0;
+ TimestampTz last_start_time = 0;
ereport(DEBUG1,
(errmsg("logical replication launcher started")));
@@ -813,10 +815,10 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
- TimestampTz now;
- long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ MemoryContext subctx;
+ MemoryContext oldctx;
+ TimestampTz now;
+ long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
now = GetCurrentTimestamp();
@@ -826,7 +828,7 @@ ApplyLauncherMain(Datum main_arg)
{
/* Use temporary context for the database list and worker info. */
subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
+ "Logical Replication Launcher sublist",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -838,8 +840,8 @@ ApplyLauncherMain(Datum main_arg)
/* Start the missing workers for enabled subscriptions. */
foreach(lc, sublist)
{
- Subscription *sub = (Subscription *) lfirst(lc);
- LogicalRepWorker *w;
+ Subscription *sub = (Subscription *) lfirst(lc);
+ LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
@@ -864,9 +866,9 @@ ApplyLauncherMain(Datum main_arg)
{
/*
* The wait in previous cycle was interrupted in less than
- * wal_retrieve_retry_interval since last worker was started,
- * this usually means crash of the worker, so we should retry
- * in wal_retrieve_retry_interval again.
+ * wal_retrieve_retry_interval since last worker was started, this
+ * usually means crash of the worker, so we should retry in
+ * wal_retrieve_retry_interval again.
*/
wait_time = wal_retrieve_retry_interval;
}
@@ -948,7 +950,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
int worker_pid;
- LogicalRepWorker worker;
+ LogicalRepWorker worker;
memcpy(&worker, &LogicalRepCtx->workers[i],
sizeof(LogicalRepWorker));
@@ -992,7 +994,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
- /* If only a single subscription was requested, and we found it, break. */
+ /*
+ * If only a single subscription was requested, and we found it,
+ * break.
+ */
if (OidIsValid(subid))
break;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7409e5ce3d..33cb01b8d0 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -118,7 +118,7 @@ StartupDecodingContext(List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
ReplicationSlot *slot;
MemoryContext context,
@@ -202,8 +202,8 @@ StartupDecodingContext(List *output_plugin_options,
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
* read_page, prepare_write, do_write, update_progress
- * callbacks that have to be filled to perform the use-case dependent,
- * actual, work.
+ * callbacks that have to be filled to perform the use-case dependent,
+ * actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -219,7 +219,7 @@ CreateInitDecodingContext(char *plugin,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgress update_progress)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 27164de093..ba4d8cc5a4 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -328,17 +328,19 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
{
LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
+
/*
* If only the confirmed_flush_lsn has changed the slot won't get
- * marked as dirty by the above. Callers on the walsender interface
- * are expected to keep track of their own progress and don't need
- * it written out. But SQL-interface users cannot specify their own
- * start positions and it's harder for them to keep track of their
- * progress, so we should make more of an effort to save it for them.
+ * marked as dirty by the above. Callers on the walsender
+ * interface are expected to keep track of their own progress and
+ * don't need it written out. But SQL-interface users cannot
+ * specify their own start positions and it's harder for them to
+ * keep track of their progress, so we should make more of an
+ * effort to save it for them.
*
- * Dirty the slot so it's written out at the next checkpoint. We'll
- * still lose its position on crash, as documented, but it's better
- * than always losing the position even on clean restart.
+ * Dirty the slot so it's written out at the next checkpoint.
+ * We'll still lose its position on crash, as documented, but it's
+ * better than always losing the position even on clean restart.
*/
ReplicationSlotMarkDirty();
}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index adc62a0f3b..ff348ff2a8 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -28,7 +28,7 @@
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
- HeapTuple tuple);
+ HeapTuple tuple);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -72,7 +72,7 @@ void
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- uint8 flags = 0;
+ uint8 flags = 0;
pq_sendbyte(out, 'C'); /* sending COMMIT */
@@ -92,7 +92,7 @@ void
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
/* read flags (unused for now) */
- uint8 flags = pq_getmsgbyte(in);
+ uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in commit message", flags);
@@ -136,7 +136,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
* Write INSERT to the output stream.
*/
void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
+logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
{
pq_sendbyte(out, 'I'); /* action INSERT */
@@ -160,7 +160,7 @@ LogicalRepRelId
logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
{
char action;
- LogicalRepRelId relid;
+ LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
@@ -180,7 +180,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
- HeapTuple newtuple)
+ HeapTuple newtuple)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
@@ -194,9 +194,9 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
if (oldtuple != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- pq_sendbyte(out, 'O'); /* old tuple follows */
+ pq_sendbyte(out, 'O'); /* old tuple follows */
else
- pq_sendbyte(out, 'K'); /* old key follows */
+ pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
}
@@ -213,7 +213,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
LogicalRepTupleData *newtup)
{
char action;
- LogicalRepRelId relid;
+ LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
@@ -277,7 +277,7 @@ LogicalRepRelId
logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
{
char action;
- LogicalRepRelId relid;
+ LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
@@ -323,7 +323,7 @@ logicalrep_write_rel(StringInfo out, Relation rel)
LogicalRepRelation *
logicalrep_read_rel(StringInfo in)
{
- LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
+ LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
rel->remoteid = pq_getmsgint(in, 4);
@@ -424,12 +424,12 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
if (isnull[i])
{
- pq_sendbyte(out, 'n'); /* null column */
+ pq_sendbyte(out, 'n'); /* null column */
continue;
}
else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
{
- pq_sendbyte(out, 'u'); /* unchanged toast column */
+ pq_sendbyte(out, 'u'); /* unchanged toast column */
continue;
}
@@ -473,21 +473,21 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
switch (kind)
{
- case 'n': /* null */
+ case 'n': /* null */
tuple->values[i] = NULL;
tuple->changed[i] = true;
break;
- case 'u': /* unchanged column */
+ case 'u': /* unchanged column */
/* we don't receive the value of an unchanged column */
tuple->values[i] = NULL;
break;
- case 't': /* text formatted value */
+ case 't': /* text formatted value */
{
int len;
tuple->changed[i] = true;
- len = pq_getmsgint(in, 4); /* read length */
+ len = pq_getmsgint(in, 4); /* read length */
/* and data */
tuple->values[i] = palloc(len + 1);
@@ -534,7 +534,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = desc->attrs[i];
- uint8 flags = 0;
+ uint8 flags = 0;
if (att->attisdropped)
continue;
@@ -612,7 +612,7 @@ logicalrep_write_namespace(StringInfo out, Oid nspid)
pq_sendbyte(out, '\0');
else
{
- char *nspname = get_namespace_name(nspid);
+ char *nspname = get_namespace_name(nspid);
if (nspname == NULL)
elog(ERROR, "cache lookup failed for namespace %u",
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 590355a846..41eff8971a 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -30,13 +30,13 @@
#include "utils/memutils.h"
#include "utils/syscache.h"
-static MemoryContext LogicalRepRelMapContext = NULL;
+static MemoryContext LogicalRepRelMapContext = NULL;
-static HTAB *LogicalRepRelMap = NULL;
-static HTAB *LogicalRepTypMap = NULL;
+static HTAB *LogicalRepRelMap = NULL;
+static HTAB *LogicalRepTypMap = NULL;
static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
- uint32 hashvalue);
+ uint32 hashvalue);
/*
* Relcache invalidation callback for our relation map cache.
@@ -44,7 +44,7 @@ static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
static void
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
- LogicalRepRelMapEntry *entry;
+ LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepRelMap == NULL)
@@ -110,7 +110,7 @@ logicalrep_relmap_init(void)
/* This will usually be small. */
LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
- HASH_ELEM | HASH_BLOBS |HASH_CONTEXT);
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
@@ -134,7 +134,7 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
if (remoterel->natts > 0)
{
- int i;
+ int i;
for (i = 0; i < remoterel->natts; i++)
pfree(remoterel->attnames[i]);
@@ -157,10 +157,10 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
void
logicalrep_relmap_update(LogicalRepRelation *remoterel)
{
- MemoryContext oldctx;
- LogicalRepRelMapEntry *entry;
- bool found;
- int i;
+ MemoryContext oldctx;
+ LogicalRepRelMapEntry *entry;
+ bool found;
+ int i;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
@@ -202,7 +202,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
static int
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
{
- int i;
+ int i;
for (i = 0; i < remoterel->natts; i++)
{
@@ -222,7 +222,7 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
- LogicalRepRelMapEntry *entry;
+ LogicalRepRelMapEntry *entry;
bool found;
if (LogicalRepRelMap == NULL)
@@ -245,7 +245,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Bitmapset *idkey;
TupleDesc desc;
LogicalRepRelation *remoterel;
- MemoryContext oldctx;
+ MemoryContext oldctx;
+
remoterel = &entry->remoterel;
/* Try to find and lock the relation by name. */
@@ -265,8 +266,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
/*
* Build the mapping of local attribute numbers to remote attribute
- * numbers and validate that we don't miss any replicated columns
- * as that would result in potentially unwanted data loss.
+ * numbers and validate that we don't miss any replicated columns as
+ * that would result in potentially unwanted data loss.
*/
desc = RelationGetDescr(entry->localrel);
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
@@ -276,8 +277,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
found = 0;
for (i = 0; i < desc->natts; i++)
{
- int attnum = logicalrep_rel_att_by_name(remoterel,
- NameStr(desc->attrs[i]->attname));
+ int attnum = logicalrep_rel_att_by_name(remoterel,
+ NameStr(desc->attrs[i]->attname));
+
entry->attrmap[i] = attnum;
if (attnum >= 0)
found++;
@@ -287,9 +289,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (found < remoterel->natts)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication target relation \"%s.%s\" is missing "
- "some replicated columns",
- remoterel->nspname, remoterel->relname)));
+ errmsg("logical replication target relation \"%s.%s\" is missing "
+ "some replicated columns",
+ remoterel->nspname, remoterel->relname)));
/*
* Check that replica identity matches. We allow for stricter replica
@@ -299,8 +301,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
* but in the opposite scenario it will.
*
* Don't throw any error here just mark the relation entry as not
- * updatable, as replica identity is only for updates and deletes
- * but inserts can be replicated even without it.
+ * updatable, as replica identity is only for updates and deletes but
+ * inserts can be replicated even without it.
*/
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
@@ -310,6 +312,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
/*
* If no replica identity index and no PK, the published table
* must have replica identity FULL.
@@ -321,14 +324,14 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
- int attnum = i + FirstLowInvalidHeapAttributeNumber;
+ int attnum = i + FirstLowInvalidHeapAttributeNumber;
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication target relation \"%s.%s\" uses "
- "system columns in REPLICA IDENTITY index",
- remoterel->nspname, remoterel->relname)));
+ errmsg("logical replication target relation \"%s.%s\" uses "
+ "system columns in REPLICA IDENTITY index",
+ remoterel->nspname, remoterel->relname)));
attnum = AttrNumberGetAttrOffset(attnum);
@@ -371,7 +374,7 @@ static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
- LogicalRepTyp *entry;
+ LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
@@ -402,9 +405,9 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
void
logicalrep_typmap_update(LogicalRepTyp *remotetyp)
{
- MemoryContext oldctx;
- LogicalRepTyp *entry;
- bool found;
+ MemoryContext oldctx;
+ LogicalRepTyp *entry;
+ bool found;
if (LogicalRepTypMap == NULL)
logicalrep_relmap_init();
@@ -433,9 +436,9 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Oid
logicalrep_typmap_getid(Oid remoteid)
{
- LogicalRepTyp *entry;
- bool found;
- Oid nspoid;
+ LogicalRepTyp *entry;
+ bool found;
+ Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 428d7aa55e..8848f5b4ec 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -59,7 +59,7 @@
* by the following graph describing the SnapBuild->state transitions:
*
* +-------------------------+
- * +----| START |-------------+
+ * +----| START |-------------+
* | +-------------------------+ |
* | | |
* | | |
@@ -68,22 +68,22 @@
* | | |
* | v |
* | +-------------------------+ v
- * | | BUILDING_SNAPSHOT |------------>|
+ * | | BUILDING_SNAPSHOT |------------>|
* | +-------------------------+ |
* | | |
* | | |
- * | running_xacts #2, xacts from #1 finished |
+ * | running_xacts #2, xacts from #1 finished |
* | | |
* | | |
* | v |
* | +-------------------------+ v
- * | | FULL_SNAPSHOT |------------>|
+ * | | FULL_SNAPSHOT |------------>|
* | +-------------------------+ |
* | | |
* running_xacts | saved snapshot
* with zero xacts | at running_xacts's lsn
* | | |
- * | running_xacts with xacts from #2 finished |
+ * | running_xacts with xacts from #2 finished |
* | | |
* | v |
* | +-------------------------+ |
@@ -209,9 +209,9 @@ struct SnapBuild
TransactionId was_xmin;
TransactionId was_xmax;
- size_t was_xcnt; /* number of used xip entries */
- size_t was_xcnt_space; /* allocated size of xip */
- TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
+ size_t was_xcnt; /* number of used xip entries */
+ size_t was_xcnt_space; /* allocated size of xip */
+ TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
} was_running;
/*
@@ -608,8 +608,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
{
if (newxcnt >= GetMaxSnapshotXidCount())
ereport(ERROR,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("initial slot snapshot too large")));
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("initial slot snapshot too large")));
newxip[newxcnt++] = xid;
}
@@ -986,6 +986,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
if (NormalTransactionIdFollows(subxid, xmax))
xmax = subxid;
}
+
/*
* If we're forcing timetravel we also need visibility information
* about subtransaction, so keep track of subtransaction's state, even
@@ -1031,8 +1032,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/*
* Adjust xmax of the snapshot builder, we only do that for committed,
- * catalog modifying, transactions, everything else isn't interesting
- * for us since we'll never look at the respective rows.
+ * catalog modifying, transactions, everything else isn't interesting for
+ * us since we'll never look at the respective rows.
*/
if (needs_timetravel &&
(!TransactionIdIsValid(builder->xmax) ||
@@ -1130,8 +1131,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
running->oldestRunningXid);
/*
- * Increase shared memory limits, so vacuum can work on tuples we prevented
- * from being pruned till now.
+ * Increase shared memory limits, so vacuum can work on tuples we
+ * prevented from being pruned till now.
*/
LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
@@ -1202,11 +1203,11 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* modifying transactions.
*
* c) First incrementally build a snapshot for catalog tuples
- * (BUILDING_SNAPSHOT), that requires all, already in-progress,
- * transactions to finish. Every transaction starting after that
- * (FULL_SNAPSHOT state), has enough information to be decoded. But
- * for older running transactions no viable snapshot exists yet, so
- * CONSISTENT will only be reached once all of those have finished.
+ * (BUILDING_SNAPSHOT), that requires all, already in-progress,
+ * transactions to finish. Every transaction starting after that
+ * (FULL_SNAPSHOT state), has enough information to be decoded. But
+ * for older running transactions no viable snapshot exists yet, so
+ * CONSISTENT will only be reached once all of those have finished.
* ---
*/
@@ -1271,6 +1272,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
/* there won't be any state to cleanup */
return false;
}
+
/*
* c) transition from START to BUILDING_SNAPSHOT.
*
@@ -1308,6 +1310,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
SnapBuildWaitSnapshot(running, running->nextXid);
}
+
/*
* c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
*
@@ -1324,13 +1327,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
SnapBuildStartNextPhaseAt(builder, running->nextXid);
ereport(LOG,
- (errmsg("logical decoding found initial consistent point at %X/%X",
- (uint32) (lsn >> 32), (uint32) lsn),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ (errmsg("logical decoding found initial consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid)));
SnapBuildWaitSnapshot(running, running->nextXid);
}
+
/*
* c) transition from FULL_SNAPSHOT to CONSISTENT.
*
@@ -1368,9 +1372,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
*
* This isn't required for the correctness of decoding, but to:
* a) allow isolationtester to notice that we're currently waiting for
- * something.
+ * something.
* b) log a new xl_running_xacts record where it'd be helpful, without having
- * to write for bgwriter or checkpointer.
+ * to write for bgwriter or checkpointer.
* ---
*/
static void
@@ -1383,9 +1387,9 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
TransactionId xid = running->xids[off];
/*
- * Upper layers should prevent that we ever need to wait on
- * ourselves. Check anyway, since failing to do so would either
- * result in an endless wait or an Assert() failure.
+ * Upper layers should prevent that we ever need to wait on ourselves.
+ * Check anyway, since failing to do so would either result in an
+ * endless wait or an Assert() failure.
*/
if (TransactionIdIsCurrentTransactionId(xid))
elog(ERROR, "waiting for ourselves");
@@ -1864,8 +1868,9 @@ CheckPointSnapBuild(void)
char path[MAXPGPATH + 21];
/*
- * We start off with a minimum of the last redo pointer. No new replication
- * slot will start before that, so that's a safe upper bound for removal.
+ * We start off with a minimum of the last redo pointer. No new
+ * replication slot will start before that, so that's a safe upper bound
+ * for removal.
*/
redo = GetRedoRecPtr();
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 7e51076b37..1e3753b8fe 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -113,7 +113,8 @@ StringInfo copybuf = NULL;
/*
* Exit routine for synchronization worker.
*/
-static void pg_attribute_noreturn()
+static void
+pg_attribute_noreturn()
finish_sync_worker(void)
{
/*
@@ -148,12 +149,12 @@ finish_sync_worker(void)
static bool
wait_for_sync_status_change(Oid relid, char origstate)
{
- int rc;
- char state = origstate;
+ int rc;
+ char state = origstate;
while (!got_SIGTERM)
{
- LogicalRepWorker *worker;
+ LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
@@ -269,7 +270,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
struct tablesync_start_time_mapping
{
Oid relid;
- TimestampTz last_start_time;
+ TimestampTz last_start_time;
};
static List *table_states = NIL;
static HTAB *last_start_times = NULL;
@@ -281,9 +282,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* We need up to date sync state info for subscription tables here. */
if (!table_states_valid)
{
- MemoryContext oldctx;
- List *rstates;
- ListCell *lc;
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
SubscriptionRelState *rstate;
/* Clean the old list. */
@@ -294,7 +295,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
started_tx = true;
/* Fetch all non-ready tables. */
- rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
+ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
@@ -324,6 +325,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
last_start_times = hash_create("Logical replication table sync worker start times",
256, &ctl, HASH_ELEM | HASH_BLOBS);
}
+
/*
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
@@ -337,14 +339,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* Process all tables that are being synchronized. */
foreach(lc, table_states)
{
- SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
if (rstate->state == SUBREL_STATE_SYNCDONE)
{
/*
- * Apply has caught up to the position where the table sync
- * has finished. Time to mark the table as ready so that
- * apply will just continue to replicate it normally.
+ * Apply has caught up to the position where the table sync has
+ * finished. Time to mark the table as ready so that apply will
+ * just continue to replicate it normally.
*/
if (current_lsn >= rstate->lsn)
{
@@ -362,8 +364,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
else
{
- LogicalRepWorker *syncworker;
- int nsyncworkers = 0;
+ LogicalRepWorker *syncworker;
+ int nsyncworkers = 0;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
@@ -376,6 +378,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
SpinLockRelease(&syncworker->relmutex);
}
else
+
/*
* If no sync worker for this table yet, count running sync
* workers for this subscription, while we have the lock, for
@@ -394,16 +397,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* There are three possible synchronization situations here.
*
* a) Apply is in front of the table sync: We tell the table
- * sync to CATCHUP.
+ * sync to CATCHUP.
*
* b) Apply is behind the table sync: We tell the table sync
- * to mark the table as SYNCDONE and finish.
-
+ * to mark the table as SYNCDONE and finish.
+ *
* c) Apply and table sync are at the same position: We tell
- * table sync to mark the table as READY and finish.
+ * table sync to mark the table as READY and finish.
*
- * In any case we'll need to wait for table sync to change
- * the state in catalog and only then continue ourselves.
+ * In any case we'll need to wait for table sync to change the
+ * state in catalog and only then continue ourselves.
*/
if (current_lsn > rstate->lsn)
{
@@ -427,20 +430,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
logicalrep_worker_wakeup_ptr(syncworker);
/*
- * Enter busy loop and wait for synchronization status
- * change.
+ * Enter busy loop and wait for synchronization status change.
*/
wait_for_sync_status_change(rstate->relid, rstate->state);
}
/*
- * If there is no sync worker registered for the table and
- * there is some free sync worker slot, start new sync worker
- * for the table.
+ * If there is no sync worker registered for the table and there
+ * is some free sync worker slot, start new sync worker for the
+ * table.
*/
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
{
- TimestampTz now = GetCurrentTimestamp();
+ TimestampTz now = GetCurrentTimestamp();
struct tablesync_start_time_mapping *hentry;
bool found;
@@ -492,7 +494,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
for (i = 0; i < desc->natts; i++)
{
- int remoteattnum = rel->attrmap[i];
+ int remoteattnum = rel->attrmap[i];
/* Skip dropped attributes. */
if (desc->attrs[i]->attisdropped)
@@ -503,7 +505,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
continue;
attnamelist = lappend(attnamelist,
- makeString(rel->remoterel.attnames[remoteattnum]));
+ makeString(rel->remoterel.attnames[remoteattnum]));
}
return attnamelist;
@@ -516,8 +518,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
static int
copy_read_data(void *outbuf, int minread, int maxread)
{
- int bytesread = 0;
- int avail;
+ int bytesread = 0;
+ int avail;
/* If there are some leftover data from previous read, use them. */
avail = copybuf->len - copybuf->cursor;
@@ -601,13 +603,13 @@ static void
fetch_remote_table_info(char *nspname, char *relname,
LogicalRepRelation *lrel)
{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[2] = {OIDOID, CHAROID};
- Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
- bool isnull;
- int natt;
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {OIDOID, CHAROID};
+ Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+ bool isnull;
+ int natt;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -615,14 +617,14 @@ fetch_remote_table_info(char *nspname, char *relname,
/* First fetch Oid and replica identity. */
initStringInfo(&cmd);
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
- " FROM pg_catalog.pg_class c"
- " INNER JOIN pg_catalog.pg_namespace n"
- " ON (c.relnamespace = n.oid)"
- " WHERE n.nspname = %s"
- " AND c.relname = %s"
- " AND c.relkind = 'r'",
- quote_literal_cstr(nspname),
- quote_literal_cstr(relname));
+ " FROM pg_catalog.pg_class c"
+ " INNER JOIN pg_catalog.pg_namespace n"
+ " ON (c.relnamespace = n.oid)"
+ " WHERE n.nspname = %s"
+ " AND c.relname = %s"
+ " AND c.relkind = 'r'",
+ quote_literal_cstr(nspname),
+ quote_literal_cstr(relname));
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
if (res->status != WALRCV_OK_TUPLES)
@@ -653,7 +655,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
- " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
+ " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
" WHERE a.attnum > 0::pg_catalog.int2"
" AND NOT a.attisdropped"
" AND a.attrelid = %u"
@@ -686,7 +688,7 @@ fetch_remote_table_info(char *nspname, char *relname,
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
- nspname, relname);
+ nspname, relname);
ExecClearTuple(slot);
}
@@ -707,9 +709,9 @@ static void
copy_table(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
- LogicalRepRelation lrel;
- WalRcvExecResult *res;
- StringInfoData cmd;
+ LogicalRepRelation lrel;
+ WalRcvExecResult *res;
+ StringInfoData cmd;
CopyState cstate;
List *attnamelist;
ParseState *pstate;
@@ -759,8 +761,8 @@ copy_table(Relation rel)
char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
- char *slotname;
- char *err;
+ char *slotname;
+ char *err;
char relstate;
XLogRecPtr relstate_lsn;
@@ -783,7 +785,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* NAMEDATALEN on the remote that matters, but this scheme will also work
* reasonably if that is different.)
*/
- StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
+ StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
slotname = psprintf("%.*s_%u_sync_%u",
NAMEDATALEN - 28,
MySubscription->slotname,
@@ -801,7 +803,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
case SUBREL_STATE_DATASYNC:
{
Relation rel;
- WalRcvExecResult *res;
+ WalRcvExecResult *res;
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -818,24 +820,23 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
pgstat_report_stat(false);
/*
- * We want to do the table data sync in single
- * transaction.
+ * We want to do the table data sync in single transaction.
*/
StartTransactionCommand();
/*
* Use standard write lock here. It might be better to
- * disallow access to table while it's being synchronized.
- * But we don't want to block the main apply process from
- * working and it has to open relation in RowExclusiveLock
- * when remapping remote relation id to local one.
+ * disallow access to table while it's being synchronized. But
+ * we don't want to block the main apply process from working
+ * and it has to open relation in RowExclusiveLock when
+ * remapping remote relation id to local one.
*/
rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
/*
- * Create temporary slot for the sync process.
- * We do this inside transaction so that we can use the
- * snapshot made by the slot to get existing data.
+ * Create temporary slot for the sync process. We do this
+ * inside transaction so that we can use the snapshot made by
+ * the slot to get existing data.
*/
res = walrcv_exec(wrconn,
"BEGIN READ ONLY ISOLATION LEVEL "
@@ -849,10 +850,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Create new temporary logical decoding slot.
*
- * We'll use slot for data copy so make sure the snapshot
- * is used for the transaction, that way the COPY will get
- * data that is consistent with the lsn used by the slot
- * to start decoding.
+ * We'll use slot for data copy so make sure the snapshot is
+ * used for the transaction, that way the COPY will get data
+ * that is consistent with the lsn used by the slot to start
+ * decoding.
*/
walrcv_create_slot(wrconn, slotname, true,
CRS_USE_SNAPSHOT, origin_startpos);
@@ -872,8 +873,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommandCounterIncrement();
/*
- * We are done with the initial data synchronization,
- * update the state.
+ * We are done with the initial data synchronization, update
+ * the state.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
@@ -881,8 +882,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
- * Wait for main apply worker to either tell us to
- * catchup or that we are done.
+ * Wait for main apply worker to either tell us to catchup or
+ * that we are done.
*/
wait_for_sync_status_change(MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04813b506e..9d1eab9e1e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -88,29 +88,29 @@
typedef struct FlushPosition
{
- dlist_node node;
- XLogRecPtr local_end;
- XLogRecPtr remote_end;
+ dlist_node node;
+ XLogRecPtr local_end;
+ XLogRecPtr remote_end;
} FlushPosition;
static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
+ LogicalRepRelation *rel;
int attnum;
} SlotErrCallbackArg;
-static MemoryContext ApplyMessageContext = NULL;
-MemoryContext ApplyContext = NULL;
+static MemoryContext ApplyMessageContext = NULL;
+MemoryContext ApplyContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *wrconn = NULL;
-Subscription *MySubscription = NULL;
-bool MySubscriptionValid = false;
+Subscription *MySubscription = NULL;
+bool MySubscriptionValid = false;
-bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+bool in_remote_transaction = false;
+static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
@@ -215,7 +215,7 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
*/
static void
slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
- TupleTableSlot *slot)
+ TupleTableSlot *slot)
{
TupleDesc desc = RelationGetDescr(rel->localrel);
int num_phys_attrs = desc->natts;
@@ -271,9 +271,9 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
- Oid remotetypoid,
- localtypoid;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ Oid remotetypoid,
+ localtypoid;
if (errarg->attnum < 0)
return;
@@ -295,12 +295,12 @@ slot_store_error_callback(void *arg)
*/
static void
slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
- char **values)
+ char **values)
{
- int natts = slot->tts_tupleDescriptor->natts;
- int i;
- SlotErrCallbackArg errarg;
- ErrorContextCallback errcallback;
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
ExecClearTuple(slot);
@@ -315,14 +315,14 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
/* Call the "in" function for each non-dropped attribute */
for (i = 0; i < natts; i++)
{
- Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
- int remoteattnum = rel->attrmap[i];
+ Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
+ int remoteattnum = rel->attrmap[i];
if (!att->attisdropped && remoteattnum >= 0 &&
values[remoteattnum] != NULL)
{
- Oid typinput;
- Oid typioparam;
+ Oid typinput;
+ Oid typioparam;
errarg.attnum = remoteattnum;
@@ -359,12 +359,12 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
*/
static void
slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
- char **values, bool *replaces)
+ char **values, bool *replaces)
{
- int natts = slot->tts_tupleDescriptor->natts;
- int i;
- SlotErrCallbackArg errarg;
- ErrorContextCallback errcallback;
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
slot_getallattrs(slot);
ExecClearTuple(slot);
@@ -380,16 +380,16 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
/* Call the "in" function for each replaced attribute */
for (i = 0; i < natts; i++)
{
- Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
- int remoteattnum = rel->attrmap[i];
+ Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
+ int remoteattnum = rel->attrmap[i];
if (remoteattnum >= 0 && !replaces[remoteattnum])
continue;
if (remoteattnum >= 0 && values[remoteattnum] != NULL)
{
- Oid typinput;
- Oid typioparam;
+ Oid typinput;
+ Oid typioparam;
errarg.attnum = remoteattnum;
@@ -418,7 +418,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
static void
apply_handle_begin(StringInfo s)
{
- LogicalRepBeginData begin_data;
+ LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
@@ -437,7 +437,7 @@ apply_handle_begin(StringInfo s)
static void
apply_handle_commit(StringInfo s)
{
- LogicalRepCommitData commit_data;
+ LogicalRepCommitData commit_data;
logicalrep_read_commit(s, &commit_data);
@@ -476,8 +476,8 @@ static void
apply_handle_origin(StringInfo s)
{
/*
- * ORIGIN message can only come inside remote transaction and before
- * any actual writes.
+ * ORIGIN message can only come inside remote transaction and before any
+ * actual writes.
*/
if (!in_remote_transaction ||
(IsTransactionState() && !am_tablesync_worker()))
@@ -497,7 +497,7 @@ apply_handle_origin(StringInfo s)
static void
apply_handle_relation(StringInfo s)
{
- LogicalRepRelation *rel;
+ LogicalRepRelation *rel;
rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);
@@ -512,7 +512,7 @@ apply_handle_relation(StringInfo s)
static void
apply_handle_type(StringInfo s)
{
- LogicalRepTyp typ;
+ LogicalRepTyp typ;
logicalrep_read_typ(s, &typ);
logicalrep_typmap_update(&typ);
@@ -526,7 +526,7 @@ apply_handle_type(StringInfo s)
static Oid
GetRelationIdentityOrPK(Relation rel)
{
- Oid idxoid;
+ Oid idxoid;
idxoid = RelationGetReplicaIndex(rel);
@@ -543,11 +543,11 @@ static void
apply_handle_insert(StringInfo s)
{
LogicalRepRelMapEntry *rel;
- LogicalRepTupleData newtup;
- LogicalRepRelId relid;
- EState *estate;
- TupleTableSlot *remoteslot;
- MemoryContext oldctx;
+ LogicalRepTupleData newtup;
+ LogicalRepRelId relid;
+ EState *estate;
+ TupleTableSlot *remoteslot;
+ MemoryContext oldctx;
ensure_transaction();
@@ -607,15 +607,15 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
return;
/*
- * We are in error mode so it's fine this is somewhat slow.
- * It's better to give user correct error.
+ * We are in error mode so it's fine this is somewhat slow. It's better to
+ * give user correct error.
*/
if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publisher does not send replica identity column "
- "expected by the logical replication target relation \"%s.%s\"",
+ "expected by the logical replication target relation \"%s.%s\"",
rel->remoterel.nspname, rel->remoterel.relname)));
}
@@ -637,17 +637,17 @@ static void
apply_handle_update(StringInfo s)
{
LogicalRepRelMapEntry *rel;
- LogicalRepRelId relid;
- Oid idxoid;
- EState *estate;
- EPQState epqstate;
- LogicalRepTupleData oldtup;
- LogicalRepTupleData newtup;
- bool has_oldtup;
- TupleTableSlot *localslot;
- TupleTableSlot *remoteslot;
- bool found;
- MemoryContext oldctx;
+ LogicalRepRelId relid;
+ Oid idxoid;
+ EState *estate;
+ EPQState epqstate;
+ LogicalRepTupleData oldtup;
+ LogicalRepTupleData newtup;
+ bool has_oldtup;
+ TupleTableSlot *localslot;
+ TupleTableSlot *remoteslot;
+ bool found;
+ MemoryContext oldctx;
ensure_transaction();
@@ -685,8 +685,8 @@ apply_handle_update(StringInfo s)
MemoryContextSwitchTo(oldctx);
/*
- * Try to find tuple using either replica identity index, primary key
- * or if needed, sequential scan.
+ * Try to find tuple using either replica identity index, primary key or
+ * if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(rel->localrel);
Assert(OidIsValid(idxoid) ||
@@ -758,15 +758,15 @@ static void
apply_handle_delete(StringInfo s)
{
LogicalRepRelMapEntry *rel;
- LogicalRepTupleData oldtup;
- LogicalRepRelId relid;
- Oid idxoid;
- EState *estate;
- EPQState epqstate;
- TupleTableSlot *remoteslot;
- TupleTableSlot *localslot;
- bool found;
- MemoryContext oldctx;
+ LogicalRepTupleData oldtup;
+ LogicalRepRelId relid;
+ Oid idxoid;
+ EState *estate;
+ EPQState epqstate;
+ TupleTableSlot *remoteslot;
+ TupleTableSlot *localslot;
+ bool found;
+ MemoryContext oldctx;
ensure_transaction();
@@ -802,8 +802,8 @@ apply_handle_delete(StringInfo s)
MemoryContextSwitchTo(oldctx);
/*
- * Try to find tuple using either replica identity index, primary key
- * or if needed, sequential scan.
+ * Try to find tuple using either replica identity index, primary key or
+ * if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(rel->localrel);
Assert(OidIsValid(idxoid) ||
@@ -826,7 +826,7 @@ apply_handle_delete(StringInfo s)
}
else
{
- /* The tuple to be deleted could not be found.*/
+ /* The tuple to be deleted could not be found. */
ereport(DEBUG1,
(errmsg("logical replication could not find row for delete "
"in replication target %s",
@@ -856,46 +856,46 @@ apply_handle_delete(StringInfo s)
static void
apply_dispatch(StringInfo s)
{
- char action = pq_getmsgbyte(s);
+ char action = pq_getmsgbyte(s);
switch (action)
{
- /* BEGIN */
+ /* BEGIN */
case 'B':
apply_handle_begin(s);
break;
- /* COMMIT */
+ /* COMMIT */
case 'C':
apply_handle_commit(s);
break;
- /* INSERT */
+ /* INSERT */
case 'I':
apply_handle_insert(s);
break;
- /* UPDATE */
+ /* UPDATE */
case 'U':
apply_handle_update(s);
break;
- /* DELETE */
+ /* DELETE */
case 'D':
apply_handle_delete(s);
break;
- /* RELATION */
+ /* RELATION */
case 'R':
apply_handle_relation(s);
break;
- /* TYPE */
+ /* TYPE */
case 'Y':
apply_handle_type(s);
break;
- /* ORIGIN */
+ /* ORIGIN */
case 'O':
apply_handle_origin(s);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid logical replication message type %c", action)));
+ errmsg("invalid logical replication message type %c", action)));
}
}
@@ -925,7 +925,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
dlist_foreach_modify(iter, &lsn_mapping)
{
FlushPosition *pos =
- dlist_container(FlushPosition, node, iter.cur);
+ dlist_container(FlushPosition, node, iter.cur);
*write = pos->remote_end;
@@ -995,12 +995,12 @@ static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
/*
- * Init the ApplyMessageContext which we clean up after each
- * replication protocol message.
+ * Init the ApplyMessageContext which we clean up after each replication
+ * protocol message.
*/
ApplyMessageContext = AllocSetContextCreate(ApplyContext,
- "ApplyMessageContext",
- ALLOCSET_DEFAULT_SIZES);
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1039,7 +1039,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
else
{
- int c;
+ int c;
StringInfoData s;
/* Reset timeout. */
@@ -1108,7 +1108,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
/*
* If we didn't get any transactions for a while there might be
- * unconsumed invalidation messages in the queue, consume them now.
+ * unconsumed invalidation messages in the queue, consume them
+ * now.
*/
AcceptInvalidationMessages();
if (!MySubscriptionValid)
@@ -1126,6 +1127,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (endofstream)
{
TimeLineID tli;
+
walrcv_endstreaming(wrconn, &tli);
break;
}
@@ -1152,19 +1154,18 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (rc & WL_TIMEOUT)
{
/*
- * We didn't receive anything new. If we haven't heard
- * anything from the server for more than
- * wal_receiver_timeout / 2, ping the server. Also, if
- * it's been longer than wal_receiver_status_interval
- * since the last update we sent, send a status update to
- * the master anyway, to report any progress in applying
- * WAL.
+ * We didn't receive anything new. If we haven't heard anything
+ * from the server for more than wal_receiver_timeout / 2, ping
+ * the server. Also, if it's been longer than
+ * wal_receiver_status_interval since the last update we sent,
+ * send a status update to the master anyway, to report any
+ * progress in applying WAL.
*/
bool requestReply = false;
/*
- * Check if time since last receive from standby has
- * reached the configured limit.
+ * Check if time since last receive from standby has reached the
+ * configured limit.
*/
if (wal_receiver_timeout > 0)
{
@@ -1180,13 +1181,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
(errmsg("terminating logical replication worker due to timeout")));
/*
- * We didn't receive anything new, for half of
- * receiver replication timeout. Ping the server.
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
*/
if (!ping_sent)
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout / 2));
+ (wal_receiver_timeout / 2));
if (now >= timeout)
{
requestReply = true;
@@ -1211,17 +1212,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
static void
send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
{
- static StringInfo reply_message = NULL;
- static TimestampTz send_time = 0;
+ static StringInfo reply_message = NULL;
+ static TimestampTz send_time = 0;
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
- XLogRecPtr writepos;
- XLogRecPtr flushpos;
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
TimestampTz now;
- bool have_pending_txes;
+ bool have_pending_txes;
/*
* If the user doesn't want status to be reported to the publisher, be
@@ -1237,8 +1238,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
get_flush_position(&writepos, &flushpos, &have_pending_txes);
/*
- * No outstanding transactions to flush, we can report the latest
- * received position. This is important for synchronous replication.
+ * No outstanding transactions to flush, we can report the latest received
+ * position. This is important for synchronous replication.
*/
if (!have_pending_txes)
flushpos = writepos = recvpos;
@@ -1262,7 +1263,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!reply_message)
{
- MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
@@ -1273,7 +1275,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
- pq_sendint64(reply_message, now); /* sendTime */
+ pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
@@ -1300,9 +1302,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void
reread_subscription(void)
{
- MemoryContext oldctx;
- Subscription *newsub;
- bool started_tx = false;
+ MemoryContext oldctx;
+ Subscription *newsub;
+ bool started_tx = false;
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
@@ -1317,47 +1319,45 @@ reread_subscription(void)
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
/*
- * Exit if the subscription was removed.
- * This normally should not happen as the worker gets killed
- * during DROP SUBSCRIPTION.
+ * Exit if the subscription was removed. This normally should not happen
+ * as the worker gets killed during DROP SUBSCRIPTION.
*/
if (!newsub)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "stop because the subscription was removed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
- * Exit if the subscription was disabled.
- * This normally should not happen as the worker gets killed
- * during ALTER SUBSCRIPTION ... DISABLE.
+ * Exit if the subscription was disabled. This normally should not happen
+ * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
*/
if (!newsub->enabled)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "stop because the subscription was disabled",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
- * Exit if connection string was changed. The launcher will start
- * new worker.
+ * Exit if connection string was changed. The launcher will start new
+ * worker.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because the connection information was changed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because the connection information was changed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1370,9 +1370,9 @@ reread_subscription(void)
if (strcmp(newsub->name, MySubscription->name) != 0)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because subscription was renamed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because subscription was renamed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1382,30 +1382,30 @@ reread_subscription(void)
Assert(newsub->slotname);
/*
- * We need to make new connection to new slot if slot name has changed
- * so exit here as well if that's the case.
+ * We need to make new connection to new slot if slot name has changed so
+ * exit here as well if that's the case.
*/
if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because the replication slot name was changed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because the replication slot name was changed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
- * Exit if publication list was changed. The launcher will start
- * new worker.
+ * Exit if publication list was changed. The launcher will start new
+ * worker.
*/
if (!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because subscription's publications were changed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because subscription's publications were changed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1448,11 +1448,11 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
void
ApplyWorkerMain(Datum main_arg)
{
- int worker_slot = DatumGetInt32(main_arg);
- MemoryContext oldctx;
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
+ int worker_slot = DatumGetInt32(main_arg);
+ MemoryContext oldctx;
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos;
+ char *myslotname;
WalRcvStreamOptions options;
/* Attach to slot */
@@ -1488,8 +1488,8 @@ ApplyWorkerMain(Datum main_arg)
/* Load the subscription into persistent memory context. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
- "ApplyContext",
- ALLOCSET_DEFAULT_SIZES);
+ "ApplyContext",
+ ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
@@ -1503,9 +1503,9 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will not "
+ "start because the subscription was disabled during startup",
+ MySubscription->name)));
proc_exit(0);
}
@@ -1530,7 +1530,7 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
- char *syncslotname;
+ char *syncslotname;
/* This is table synchroniation worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
@@ -1545,10 +1545,10 @@ ApplyWorkerMain(Datum main_arg)
else
{
/* This is main apply worker */
- RepOriginId originid;
- TimeLineID startpointTLI;
- char *err;
- int server_version;
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char *err;
+ int server_version;
myslotname = MySubscription->slotname;
@@ -1570,9 +1570,8 @@ ApplyWorkerMain(Datum main_arg)
(errmsg("could not connect to the publisher: %s", err)));
/*
- * We don't really use the output identify_system for anything
- * but it does some initializations on the upstream so let's still
- * call it.
+ * We don't really use the output identify_system for anything but it
+ * does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(wrconn, &startpointTLI,
&server_version);
@@ -1580,8 +1579,8 @@ ApplyWorkerMain(Datum main_arg)
}
/*
- * Setup callback for syscache so that we know when something
- * changes in the subscription relation state.
+ * Setup callback for syscache so that we know when something changes in
+ * the subscription relation state.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 694f351dd8..5bdfa60ae7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -29,31 +29,31 @@ PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
-static void pgoutput_startup(LogicalDecodingContext * ctx,
- OutputPluginOptions *opt, bool is_init);
-static void pgoutput_shutdown(LogicalDecodingContext * ctx);
+static void pgoutput_startup(LogicalDecodingContext *ctx,
+ OutputPluginOptions *opt, bool is_init);
+static void pgoutput_shutdown(LogicalDecodingContext *ctx);
static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn);
+ ReorderBufferTXN *txn);
static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+ ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pgoutput_change(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn, Relation rel,
- ReorderBufferChange *change);
+ ReorderBufferTXN *txn, Relation rel,
+ ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
- RepOriginId origin_id);
+ RepOriginId origin_id);
static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
- uint32 hashvalue);
+ uint32 hashvalue);
/* Entry in the map used to remember which relation schemas we sent. */
typedef struct RelationSyncEntry
{
- Oid relid; /* relation oid */
- bool schema_sent; /* did we send the schema? */
- bool replicate_valid;
+ Oid relid; /* relation oid */
+ bool schema_sent; /* did we send the schema? */
+ bool replicate_valid;
PublicationActions pubactions;
} RelationSyncEntry;
@@ -64,7 +64,7 @@ static void init_rel_sync_cache(MemoryContext decoding_context);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
- uint32 hashvalue);
+ uint32 hashvalue);
/*
* Specify output plugin callbacks
@@ -130,9 +130,9 @@ parse_output_parameters(List *options, uint32 *protocol_version,
if (!SplitIdentifierString(strVal(defel->arg), ',',
publication_names))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_NAME),
- errmsg("invalid publication_names syntax")));
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("invalid publication_names syntax")));
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -143,14 +143,14 @@ parse_output_parameters(List *options, uint32 *protocol_version,
* Initialize this plugin
*/
static void
-pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
- bool is_init)
+pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
+ bool is_init)
{
- PGOutputData *data = palloc0(sizeof(PGOutputData));
+ PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
- "logical replication output context",
+ "logical replication output context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -175,15 +175,15 @@ pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
/* Check if we support requested protocol */
if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM)
ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("client sent proto_version=%d but we only support protocol %d or lower",
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("client sent proto_version=%d but we only support protocol %d or lower",
data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("client sent proto_version=%d but we only support protocol %d or higher",
- data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("client sent proto_version=%d but we only support protocol %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
if (list_length(data->publication_names) < 1)
ereport(ERROR,
@@ -208,14 +208,14 @@ pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
- bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
if (send_replication_origin)
{
- char *origin;
+ char *origin;
/* Message boundary */
OutputPluginWrite(ctx, false);
@@ -225,10 +225,10 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
* XXX: which behaviour do we want here?
*
* Alternatives:
- * - don't send origin message if origin name not found
- * (that's what we do now)
- * - throw error - that will break replication, not good
- * - send some special "unknown" origin
+ * - don't send origin message if origin name not found
+ * (that's what we do now)
+ * - throw error - that will break replication, not good
+ * - send some special "unknown" origin
*----------
*/
if (replorigin_by_oid(txn->origin_id, true, &origin))
@@ -243,7 +243,7 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
*/
static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn)
{
OutputPluginUpdateProgress(ctx);
@@ -259,9 +259,9 @@ static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
- PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
- MemoryContext old;
- RelationSyncEntry *relentry;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ MemoryContext old;
+ RelationSyncEntry *relentry;
relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
@@ -333,8 +333,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
case REORDER_BUFFER_CHANGE_UPDATE:
{
- HeapTuple oldtuple = change->data.tp.oldtuple ?
- &change->data.tp.oldtuple->tuple : NULL;
+ HeapTuple oldtuple = change->data.tp.oldtuple ?
+ &change->data.tp.oldtuple->tuple : NULL;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple,
@@ -367,7 +367,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
- RepOriginId origin_id)
+ RepOriginId origin_id)
{
return false;
}
@@ -379,7 +379,7 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
* of the ctx->context so it will be cleaned up by logical decoding machinery.
*/
static void
-pgoutput_shutdown(LogicalDecodingContext * ctx)
+pgoutput_shutdown(LogicalDecodingContext *ctx)
{
if (RelationSyncCache)
{
@@ -397,10 +397,10 @@ LoadPublications(List *pubnames)
List *result = NIL;
ListCell *lc;
- foreach (lc, pubnames)
+ foreach(lc, pubnames)
{
- char *pubname = (char *) lfirst(lc);
- Publication *pub = GetPublicationByName(pubname, false);
+ char *pubname = (char *) lfirst(lc);
+ Publication *pub = GetPublicationByName(pubname, false);
result = lappend(result, pub);
}
@@ -417,9 +417,8 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
publications_valid = false;
/*
- * Also invalidate per-relation cache so that next time the filtering
- * info is checked it will be updated with the new publication
- * settings.
+ * Also invalidate per-relation cache so that next time the filtering info
+ * is checked it will be updated with the new publication settings.
*/
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
@@ -434,7 +433,7 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
static void
init_rel_sync_cache(MemoryContext cachectx)
{
- HASHCTL ctl;
+ HASHCTL ctl;
MemoryContext old_ctxt;
if (RelationSyncCache != NULL)
@@ -466,9 +465,9 @@ init_rel_sync_cache(MemoryContext cachectx)
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Oid relid)
{
- RelationSyncEntry *entry;
- bool found;
- MemoryContext oldctx;
+ RelationSyncEntry *entry;
+ bool found;
+ MemoryContext oldctx;
Assert(RelationSyncCache != NULL);
@@ -499,9 +498,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
}
/*
- * Build publication cache. We can't use one provided by relcache
- * as relcache considers all publications given relation is in, but
- * here we only need to consider ones that the subscriber requested.
+ * Build publication cache. We can't use one provided by relcache as
+ * relcache considers all publications given relation is in, but here
+ * we only need to consider ones that the subscriber requested.
*/
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = false;
@@ -539,7 +538,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
static void
rel_sync_cache_relation_cb(Datum arg, Oid relid)
{
- RelationSyncEntry *entry;
+ RelationSyncEntry *entry;
/*
* We can get here if the plugin was used in SQL interface as the
@@ -558,15 +557,14 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
* safe point.
*
* Getting invalidations for relations that aren't in the table is
- * entirely normal, since there's no way to unregister for an
- * invalidation event. So we don't care if it's found or not.
+ * entirely normal, since there's no way to unregister for an invalidation
+ * event. So we don't care if it's found or not.
*/
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
/*
- * Reset schema sent status as the relation definition may have
- * changed.
+ * Reset schema sent status as the relation definition may have changed.
*/
if (entry != NULL)
entry->schema_sent = false;
@@ -578,8 +576,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
static void
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
{
- HASH_SEQ_STATUS status;
- RelationSyncEntry *entry;
+ HASH_SEQ_STATUS status;
+ RelationSyncEntry *entry;
/*
* We can get here if the plugin was used in SQL interface as the
@@ -590,8 +588,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
return;
/*
- * There is no way to find which entry in our cache the hash belongs to
- * so mark the whole cache as invalid.
+ * There is no way to find which entry in our cache the hash belongs to so
+ * mark the whole cache as invalid.
*/
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5f63d0484a..5386e86aa6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -502,8 +502,8 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
/*
* Rename the slot directory on disk, so that we'll no longer recognize
* this as a valid slot. Note that if this fails, we've got to mark the
- * slot inactive before bailing out. If we're dropping an ephemeral or
- * a temporary slot, we better never fail hard as the caller won't expect
+ * slot inactive before bailing out. If we're dropping an ephemeral or a
+ * temporary slot, we better never fail hard as the caller won't expect
* the slot to survive and this might get called during error handling.
*/
if (rename(path, tmppath) == 0)
@@ -839,8 +839,8 @@ restart:
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s;
- char *slotname;
- int active_pid;
+ char *slotname;
+ int active_pid;
s = &ReplicationSlotCtl->replication_slots[i];
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 56a9ca9651..bbd26f3d6a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -119,11 +119,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
/*
* Acquire a logical decoding slot, this will check for conflicting names.
- * Initially create persistent slot as ephemeral - that allows us to nicely
- * handle errors during initialization because it'll get dropped if this
- * transaction fails. We'll make it persistent at the end.
- * Temporary slots can be created as temporary from beginning as they get
- * dropped on error as well.
+ * Initially create persistent slot as ephemeral - that allows us to
+ * nicely handle errors during initialization because it'll get dropped if
+ * this transaction fails. We'll make it persistent at the end. Temporary
+ * slots can be created as temporary from beginning as they get dropped on
+ * error as well.
*/
ReplicationSlotCreate(NameStr(*name), true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
@@ -132,7 +132,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
* Create logical decoding context, to build the initial snapshot.
*/
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
- false, /* do not build snapshot */
+ false, /* do not build snapshot */
logical_read_local_xlog_page, NULL, NULL,
NULL);
@@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
- ReplicationSlotPersistency persistency;
+ ReplicationSlotPersistency persistency;
TransactionId xmin;
TransactionId catalog_xmin;
XLogRecPtr restart_lsn;
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 554f783209..ad213fc454 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -102,17 +102,17 @@ static void SyncRepCancelWait(void);
static int SyncRepWakeQueue(bool all, int mode);
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
- XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr,
- bool *am_sync);
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ bool *am_sync);
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
- XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr,
- List *sync_standbys);
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ List *sync_standbys);
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
- XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr,
- List *sync_standbys, uint8 nth);
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ List *sync_standbys, uint8 nth);
static int SyncRepGetStandbyPriority(void);
static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
@@ -455,7 +455,7 @@ SyncRepReleaseWaiters(void)
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
ereport(LOG,
(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
- application_name, MyWalSnd->sync_standby_priority)));
+ application_name, MyWalSnd->sync_standby_priority)));
else
ereport(LOG,
(errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
@@ -513,7 +513,7 @@ SyncRepReleaseWaiters(void)
*/
static bool
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, bool *am_sync)
+ XLogRecPtr *applyPtr, bool *am_sync)
{
List *sync_standbys;
@@ -542,9 +542,9 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
* oldest ones among sync standbys. In a quorum-based, they are the Nth
* latest ones.
*
- * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions.
- * But we use SyncRepGetOldestSyncRecPtr() for that calculation because
- * it's a bit more efficient.
+ * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
+ * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
+ * because it's a bit more efficient.
*
* XXX If the numbers of current and requested sync standbys are the same,
* we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
@@ -572,15 +572,15 @@ static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, List *sync_standbys)
{
- ListCell *cell;
+ ListCell *cell;
/*
- * Scan through all sync standbys and calculate the oldest
- * Write, Flush and Apply positions.
+ * Scan through all sync standbys and calculate the oldest Write, Flush
+ * and Apply positions.
*/
- foreach (cell, sync_standbys)
+ foreach(cell, sync_standbys)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
@@ -606,23 +606,23 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
*/
static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+ XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
{
- ListCell *cell;
- XLogRecPtr *write_array;
- XLogRecPtr *flush_array;
- XLogRecPtr *apply_array;
- int len;
- int i = 0;
+ ListCell *cell;
+ XLogRecPtr *write_array;
+ XLogRecPtr *flush_array;
+ XLogRecPtr *apply_array;
+ int len;
+ int i = 0;
len = list_length(sync_standbys);
write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
- foreach (cell, sync_standbys)
+ foreach(cell, sync_standbys)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
SpinLockAcquire(&walsnd->mutex);
write_array[i] = walsnd->write;
@@ -654,8 +654,8 @@ SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
static int
cmp_lsn(const void *a, const void *b)
{
- XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
- XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
+ XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
+ XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 > lsn2)
return -1;
@@ -674,7 +674,7 @@ cmp_lsn(const void *a, const void *b)
* sync standby. Otherwise it's set to false.
*/
List *
-SyncRepGetSyncStandbys(bool *am_sync)
+SyncRepGetSyncStandbys(bool *am_sync)
{
/* Set default result */
if (am_sync != NULL)
@@ -702,8 +702,8 @@ SyncRepGetSyncStandbys(bool *am_sync)
static List *
SyncRepGetSyncStandbysQuorum(bool *am_sync)
{
- List *result = NIL;
- int i;
+ List *result = NIL;
+ int i;
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
* rearrangement */
@@ -730,8 +730,8 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
continue;
/*
- * Consider this standby as a candidate for quorum sync standbys
- * and append it to the result.
+ * Consider this standby as a candidate for quorum sync standbys and
+ * append it to the result.
*/
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == MyWalSnd)
@@ -955,8 +955,8 @@ SyncRepGetStandbyPriority(void)
return 0;
/*
- * In quorum-based sync replication, all the standbys in the list
- * have the same priority, one.
+ * In quorum-based sync replication, all the standbys in the list have the
+ * same priority, one.
*/
return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 028170c952..2723612718 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1176,9 +1176,12 @@ XLogWalRcvSendHSFeedback(bool immed)
{
TimestampTz now;
TransactionId nextXid;
- uint32 xmin_epoch, catalog_xmin_epoch;
- TransactionId xmin, catalog_xmin;
+ uint32 xmin_epoch,
+ catalog_xmin_epoch;
+ TransactionId xmin,
+ catalog_xmin;
static TimestampTz sendTime = 0;
+
/* initially true so we always send at least one feedback message */
static bool master_has_standby_xmin = true;
@@ -1211,8 +1214,8 @@ XLogWalRcvSendHSFeedback(bool immed)
*
* Bailing out here also ensures that we don't send feedback until we've
* read our own replication slot state, so we don't tell the master to
- * discard needed xmin or catalog_xmin from any slots that may exist
- * on this replica.
+ * discard needed xmin or catalog_xmin from any slots that may exist on
+ * this replica.
*/
if (!HotStandbyActive())
return;
@@ -1232,7 +1235,7 @@ XLogWalRcvSendHSFeedback(bool immed)
* excludes the catalog_xmin.
*/
xmin = GetOldestXmin(NULL,
- PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
+ PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
@@ -1253,9 +1256,9 @@ XLogWalRcvSendHSFeedback(bool immed)
GetNextXidAndEpoch(&nextXid, &xmin_epoch);
catalog_xmin_epoch = xmin_epoch;
if (nextXid < xmin)
- xmin_epoch --;
+ xmin_epoch--;
if (nextXid < catalog_xmin)
- catalog_xmin_epoch --;
+ catalog_xmin_epoch--;
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a899841d83..49cce38880 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -197,7 +197,7 @@ static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
/* A sample associating a WAL location with the time it was written. */
typedef struct
{
- XLogRecPtr lsn;
+ XLogRecPtr lsn;
TimestampTz time;
} WalTimeSample;
@@ -207,12 +207,12 @@ typedef struct
/* A mechanism for tracking replication lag. */
static struct
{
- XLogRecPtr last_lsn;
+ XLogRecPtr last_lsn;
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
- int write_head;
- int read_heads[NUM_SYNC_REP_WAIT_MODE];
+ int write_head;
+ int read_heads[NUM_SYNC_REP_WAIT_MODE];
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
-} LagTracker;
+} LagTracker;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
@@ -530,7 +530,7 @@ StartReplication(StartReplicationCmd *cmd)
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
+ errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
/*
* We assume here that we're logging enough information in the WAL for
@@ -580,8 +580,8 @@ StartReplication(StartReplicationCmd *cmd)
sendTimeLineIsHistoric = true;
/*
- * Check that the timeline the client requested exists, and
- * the requested start location is on that timeline.
+ * Check that the timeline the client requested exists, and the
+ * requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
@@ -599,8 +599,8 @@ StartReplication(StartReplicationCmd *cmd)
* request to start replication from the beginning of the WAL
* segment that contains switchpoint, but on the new timeline, so
* that it doesn't end up with a partial segment. If you ask for
- * too old a starting point, you'll get an error later when we fail
- * to find the requested WAL segment in pg_wal.
+ * too old a starting point, you'll get an error later when we
+ * fail to find the requested WAL segment in pg_wal.
*
* XXX: we could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it's still in the same
@@ -717,9 +717,9 @@ StartReplication(StartReplicationCmd *cmd)
MemSet(nulls, false, sizeof(nulls));
/*
- * Need a tuple descriptor representing two columns.
- * int8 may seem like a surprising data type for this, but in theory
- * int4 would not be wide enough for this, as TimeLineID is unsigned.
+ * Need a tuple descriptor representing two columns. int8 may seem
+ * like a surprising data type for this, but in theory int4 would not
+ * be wide enough for this, as TimeLineID is unsigned.
*/
tupdesc = CreateTemplateTupleDesc(2, false);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
@@ -795,7 +795,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool reserve_wal_given = false;
/* Parse options */
- foreach (lc, cmd->options)
+ foreach(lc, cmd->options)
{
DefElem *defel = (DefElem *) lfirst(lc);
@@ -883,7 +883,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (cmd->kind == REPLICATION_KIND_LOGICAL)
{
LogicalDecodingContext *ctx;
- bool need_full_snapshot = false;
+ bool need_full_snapshot = false;
/*
* Do options check early so that we can bail before calling the
@@ -1255,10 +1255,10 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
TimestampTz now = GetCurrentTimestamp();
/*
- * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
- * to avoid flooding the lag tracker when we commit frequently.
+ * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
+ * avoid flooding the lag tracker when we commit frequently.
*/
-#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
if (!TimestampDifferenceExceeds(sendTime, now,
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
return;
@@ -1474,8 +1474,8 @@ exec_replication_command(const char *cmd_string)
SnapBuildClearExportedSnapshot();
/*
- * For aborted transactions, don't allow anything except pure SQL,
- * the exec_simple_query() will handle it correctly.
+ * For aborted transactions, don't allow anything except pure SQL, the
+ * exec_simple_query() will handle it correctly.
*/
if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
ereport(ERROR,
@@ -1744,7 +1744,7 @@ ProcessStandbyReplyMessage(void)
bool clearLagTimes;
TimestampTz now;
- static bool fullyAppliedLastTime = false;
+ static bool fullyAppliedLastTime = false;
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message);
@@ -1892,7 +1892,7 @@ TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
}
if (!TransactionIdPrecedesOrEquals(xid, nextXid))
- return false; /* epoch OK, but it's wrapped around */
+ return false; /* epoch OK, but it's wrapped around */
return true;
}
@@ -1974,8 +1974,8 @@ ProcessStandbyHSFeedbackMessage(void)
*
* If we're using a replication slot we reserve the xmin via that,
* otherwise via the walsender's PGXACT entry. We can only track the
- * catalog xmin separately when using a slot, so we store the least
- * of the two provided when not using a slot.
+ * catalog xmin separately when using a slot, so we store the least of the
+ * two provided when not using a slot.
*
* XXX: It might make sense to generalize the ephemeral slot concept and
* always use the slot mechanism to handle the feedback xmin.
@@ -2155,8 +2155,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
}
/*
- * At the reception of SIGUSR2, switch the WAL sender to the stopping
- * state.
+ * At the reception of SIGUSR2, switch the WAL sender to the
+ * stopping state.
*/
if (got_SIGUSR2)
WalSndSetState(WALSNDSTATE_STOPPING);
@@ -2588,18 +2588,18 @@ XLogSendPhysical(void)
* it seems good enough to capture the time here. We should reach this
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
* may take some time, we read the WAL flush pointer and take the time
- * very close to together here so that we'll get a later position if it
- * is still moving.
+ * very close to together here so that we'll get a later position if it is
+ * still moving.
*
* Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
* this gives us a cheap approximation for the WAL flush time for this
* LSN.
*
* Note that the LSN is not necessarily the LSN for the data contained in
- * the present message; it's the end of the WAL, which might be
- * further ahead. All the lag tracking machinery cares about is finding
- * out when that arbitrary LSN is eventually reported as written, flushed
- * and applied, so that it can measure the elapsed time.
+ * the present message; it's the end of the WAL, which might be further
+ * ahead. All the lag tracking machinery cares about is finding out when
+ * that arbitrary LSN is eventually reported as written, flushed and
+ * applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
@@ -2758,8 +2758,8 @@ XLogSendLogical(void)
if (record != NULL)
{
/*
- * Note the lack of any call to LagTrackerWrite() which is handled
- * by WalSndUpdateProgress which is called by output plugin through
+ * Note the lack of any call to LagTrackerWrite() which is handled by
+ * WalSndUpdateProgress which is called by output plugin through
* logical decoding write api.
*/
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
@@ -2805,9 +2805,8 @@ WalSndDone(WalSndSendDataCallback send_data)
/*
* To figure out whether all WAL has successfully been replicated, check
- * flush location if valid, write otherwise. Tools like pg_receivewal
- * will usually (unless in synchronous mode) return an invalid flush
- * location.
+ * flush location if valid, write otherwise. Tools like pg_receivewal will
+ * usually (unless in synchronous mode) return an invalid flush location.
*/
replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
MyWalSnd->write : MyWalSnd->flush;
@@ -3077,7 +3076,7 @@ WalSndWaitStopping(void)
if (all_stopped)
return;
- pg_usleep(10000L); /* wait for 10 msec */
+ pg_usleep(10000L); /* wait for 10 msec */
}
}
@@ -3123,7 +3122,7 @@ WalSndGetStateString(WalSndState state)
static Interval *
offset_to_interval(TimeOffset offset)
{
- Interval *result = palloc(sizeof(Interval));
+ Interval *result = palloc(sizeof(Interval));
result->month = 0;
result->day = 0;
@@ -3360,9 +3359,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
static void
LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
{
- bool buffer_full;
- int new_write_head;
- int i;
+ bool buffer_full;
+ int new_write_head;
+ int i;
if (!am_walsender)
return;
@@ -3448,16 +3447,16 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
/*
* We didn't cross a time. If there is a future sample that we
* haven't reached yet, and we've already reached at least one sample,
- * let's interpolate the local flushed time. This is mainly useful for
- * reporting a completely stuck apply position as having increasing
- * lag, since otherwise we'd have to wait for it to eventually start
- * moving again and cross one of our samples before we can show the
- * lag increasing.
+ * let's interpolate the local flushed time. This is mainly useful
+ * for reporting a completely stuck apply position as having
+ * increasing lag, since otherwise we'd have to wait for it to
+ * eventually start moving again and cross one of our samples before
+ * we can show the lag increasing.
*/
if (LagTracker.read_heads[head] != LagTracker.write_head &&
LagTracker.last_read[head].time != 0)
{
- double fraction;
+ double fraction;
WalTimeSample prev = LagTracker.last_read[head];
WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];