diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/basebackup.c | 32 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 100 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 107 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 8 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 18 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 38 | ||||
-rw-r--r-- | src/backend/replication/logical/relation.c | 71 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 65 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 145 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 327 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 120 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 8 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 14 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 74 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 17 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 95 |
16 files changed, 627 insertions, 612 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 3ee0dd5aa4..cb5f58b6ba 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -58,8 +58,8 @@ static bool sendFile(char *readfilename, char *tarfilename, static void sendFileWithContent(const char *filename, const char *content); static int64 _tarWriteHeader(const char *filename, const char *linktarget, struct stat * statbuf, bool sizeonly); -static int64 _tarWriteDir(const char *pathbuf, int basepathlen, struct stat *statbuf, - bool sizeonly); +static int64 _tarWriteDir(const char *pathbuf, int basepathlen, struct stat * statbuf, + bool sizeonly); static void send_int8_string(StringInfoData *buf, int64 intval); static void SendBackupHeader(List *tablespaces); static void base_backup_cleanup(int code, Datum arg); @@ -106,15 +106,15 @@ static const char *excludeDirContents[] = { /* * Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped even - * when stats_temp_directory is set because PGSS_TEXT_FILE is always created - * there. + * when stats_temp_directory is set because PGSS_TEXT_FILE is always + * created there. */ PG_STAT_TMP_DIR, /* - * It is generally not useful to backup the contents of this directory even - * if the intention is to restore to another master. See backup.sgml for a - * more detailed description. + * It is generally not useful to backup the contents of this directory + * even if the intention is to restore to another master. See backup.sgml + * for a more detailed description. */ "pg_replslot", @@ -365,7 +365,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) dir = AllocateDir("pg_wal"); if (!dir) ereport(ERROR, - (errmsg("could not open directory \"%s\": %m", "pg_wal"))); + (errmsg("could not open directory \"%s\": %m", "pg_wal"))); while ((de = ReadDir(dir, "pg_wal")) != NULL) { /* Does it look like a WAL segment, and is it in the range? */ @@ -404,8 +404,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) qsort(walFiles, nWalFiles, sizeof(char *), compareWalFileNames); /* - * There must be at least one xlog file in the pg_wal directory, - * since we are doing backup-including-xlog. + * There must be at least one xlog file in the pg_wal directory, since + * we are doing backup-including-xlog. */ if (nWalFiles < 1) ereport(ERROR, @@ -1036,7 +1036,7 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces, if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0) { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name); - size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly); + size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly); excludeFound = true; break; } @@ -1281,7 +1281,7 @@ _tarWriteHeader(const char *filename, const char *linktarget, if (!sizeonly) { rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size, - statbuf->st_mode, statbuf->st_uid, statbuf->st_gid, + statbuf->st_mode, statbuf->st_uid, statbuf->st_gid, statbuf->st_mtime); switch (rc) @@ -1295,9 +1295,9 @@ _tarWriteHeader(const char *filename, const char *linktarget, break; case TAR_SYMLINK_TOO_LONG: ereport(ERROR, - (errmsg("symbolic link target too long for tar format: " - "file name \"%s\", target \"%s\"", - filename, linktarget))); + (errmsg("symbolic link target too long for tar format: " + "file name \"%s\", target \"%s\"", + filename, linktarget))); break; default: elog(ERROR, "unrecognized tar error: %d", rc); @@ -1314,7 +1314,7 @@ _tarWriteHeader(const char *filename, const char *linktarget, * write it as a directory anyway. */ static int64 -_tarWriteDir(const char *pathbuf, int basepathlen, struct stat *statbuf, +_tarWriteDir(const char *pathbuf, int basepathlen, struct stat * statbuf, bool sizeonly) { /* If symlink, write it as a directory anyway */ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9d7bb25d39..ebe9c91e98 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -40,42 +40,42 @@ void _PG_init(void); struct WalReceiverConn { /* Current connection to the primary, if any */ - PGconn *streamConn; + PGconn *streamConn; /* Used to remember if the connection is logical or physical */ - bool logical; + bool logical; /* Buffer for currently read records */ - char *recvBuf; + char *recvBuf; }; /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, const char *appname, - char **err); + bool logical, const char *appname, + char **err); static void libpqrcv_check_conninfo(const char *conninfo); static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static char *libpqrcv_identify_system(WalReceiverConn *conn, - TimeLineID *primary_tli, - int *server_version); + TimeLineID *primary_tli, + int *server_version); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len); static bool libpqrcv_startstreaming(WalReceiverConn *conn, - const WalRcvStreamOptions *options); + const WalRcvStreamOptions *options); static void libpqrcv_endstreaming(WalReceiverConn *conn, - TimeLineID *next_tli); -static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd); + TimeLineID *next_tli); +static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, + pgsocket *wait_fd); static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, - int nbytes); + int nbytes); static char *libpqrcv_create_slot(WalReceiverConn *conn, - const char *slotname, - bool temporary, - CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn); + const char *slotname, + bool temporary, + CRSSnapshotAction snapshot_action, + XLogRecPtr *lsn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, - const char *query, - const int nRetTypes, - const Oid *retTypes); + const char *query, + const int nRetTypes, + const Oid *retTypes); static void libpqrcv_disconnect(WalReceiverConn *conn); static WalReceiverFunctionsType PQWalReceiverFunctions = { @@ -153,7 +153,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, conn = palloc0(sizeof(WalReceiverConn)); conn->streamConn = PQconnectStartParams(keys, vals, - /* expand_dbname = */ true); + /* expand_dbname = */ true); if (PQstatus(conn->streamConn) == CONNECTION_BAD) { *err = pchomp(PQerrorMessage(conn->streamConn)); @@ -216,8 +216,8 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, static void libpqrcv_check_conninfo(const char *conninfo) { - PQconninfoOption *opts = NULL; - char *err = NULL; + PQconninfoOption *opts = NULL; + char *err = NULL; opts = PQconninfoParse(conninfo, &err); if (opts == NULL) @@ -362,9 +362,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, */ if (options->logical) { - char *pubnames_str; - List *pubnames; - char *pubnames_literal; + char *pubnames_str; + List *pubnames; + char *pubnames_literal; appendStringInfoString(&cmd, " ("); @@ -435,8 +435,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * next timeline's ID, or just CommandComplete if the server was shut * down. * - * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT - * is also possible in case we aborted the copy in mid-stream. + * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is + * also possible in case we aborted the copy in mid-stream. */ res = PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) @@ -545,9 +545,9 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) /* * PQexec() silently discards any prior query results on the connection. - * This is not required for this function as it's expected that the - * caller (which is this library in all cases) will behave correctly and - * we don't have to be backwards compatible with old libpq. + * This is not required for this function as it's expected that the caller + * (which is this library in all cases) will behave correctly and we don't + * have to be backwards compatible with old libpq. */ /* @@ -737,9 +737,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { - PGresult *res; - StringInfoData cmd; - char *snapshot; + PGresult *res; + StringInfoData cmd; + char *snapshot; initStringInfo(&cmd); @@ -777,7 +777,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, } *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, - CStringGetDatum(PQgetvalue(res, 0, 1)))); + CStringGetDatum(PQgetvalue(res, 0, 1)))); if (!PQgetisnull(res, 0, 2)) snapshot = pstrdup(PQgetvalue(res, 0, 2)); else @@ -793,15 +793,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, - const int nRetTypes, const Oid *retTypes) + const int nRetTypes, const Oid *retTypes) { - int tupn; - int coln; - int nfields = PQnfields(pgres); - HeapTuple tuple; - AttInMetadata *attinmeta; - MemoryContext rowcontext; - MemoryContext oldcontext; + int tupn; + int coln; + int nfields = PQnfields(pgres); + HeapTuple tuple; + AttInMetadata *attinmeta; + MemoryContext rowcontext; + MemoryContext oldcontext; /* Make sure we got expected number of fields. */ if (nfields != nRetTypes) @@ -832,7 +832,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, /* Process returned rows. */ for (tupn = 0; tupn < PQntuples(pgres); tupn++) { - char *cstrs[MaxTupleAttributeNumber]; + char *cstrs[MaxTupleAttributeNumber]; CHECK_FOR_INTERRUPTS(); @@ -877,7 +877,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, if (MyDatabaseId == InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("the query interface requires a database connection"))); + errmsg("the query interface requires a database connection"))); pgres = libpqrcv_PQexec(conn->streamConn, query); @@ -905,7 +905,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, walres->status = WALRCV_OK_COMMAND; break; - /* Empty query is considered error. */ + /* Empty query is considered error. */ case PGRES_EMPTY_QUERY: walres->status = WALRCV_ERROR; walres->err = _("empty query"); @@ -935,16 +935,16 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, static char * stringlist_to_identifierstr(PGconn *conn, List *strings) { - ListCell *lc; + ListCell *lc; StringInfoData res; - bool first = true; + bool first = true; initStringInfo(&res); - foreach (lc, strings) + foreach(lc, strings) { - char *val = strVal(lfirst(lc)); - char *val_escaped; + char *val = strVal(lfirst(lc)); + char *val_escaped; if (first) first = false; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 09c87d7c53..4e2c350dc7 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -57,8 +57,8 @@ /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L -int max_logical_replication_workers = 4; -int max_sync_workers_per_subscription = 2; +int max_logical_replication_workers = 4; +int max_sync_workers_per_subscription = 2; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -68,7 +68,7 @@ typedef struct LogicalRepCtxStruct pid_t launcher_pid; /* Background workers. */ - LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; + LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; LogicalRepCtxStruct *LogicalRepCtx; @@ -83,9 +83,9 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker); volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t got_SIGTERM = false; -static bool on_commit_launcher_wakeup = false; +static bool on_commit_launcher_wakeup = false; -Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); +Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); /* @@ -122,8 +122,8 @@ get_subscription_list(void) while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) { Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); - Subscription *sub; - MemoryContext oldcxt; + Subscription *sub; + MemoryContext oldcxt; /* * Allocate our results in the caller's context, not the @@ -224,15 +224,16 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running) { - int i; - LogicalRepWorker *res = NULL; + int i; + LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + if (w->in_use && w->subid == subid && w->relid == relid && (!only_running || w->proc)) { @@ -251,17 +252,17 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid) { - BackgroundWorker bgw; + BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; - int i; - int slot = 0; - LogicalRepWorker *worker = NULL; - int nsyncworkers; - TimestampTz now; + int i; + int slot = 0; + LogicalRepWorker *worker = NULL; + int nsyncworkers; + TimestampTz now; ereport(LOG, - (errmsg("starting logical replication worker for subscription \"%s\"", - subname))); + (errmsg("starting logical replication worker for subscription \"%s\"", + subname))); /* Report this after the initial starting message for consistency. */ if (max_replication_slots == 0) @@ -300,7 +301,7 @@ retry: */ if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) { - bool did_cleanup = false; + bool did_cleanup = false; for (i = 0; i < max_logical_replication_workers; i++) { @@ -373,7 +374,7 @@ retry: /* Register the new dynamic worker. */ memset(&bgw, 0, sizeof(bgw)); - bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); @@ -394,7 +395,7 @@ retry: ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), - errhint("You might need to increase max_worker_processes."))); + errhint("You might need to increase max_worker_processes."))); return; } @@ -410,7 +411,7 @@ void logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; - uint16 generation; + uint16 generation; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -435,7 +436,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) */ while (worker->in_use && !worker->proc) { - int rc; + int rc; LWLockRelease(LogicalRepWorkerLock); @@ -478,7 +479,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) /* ... and wait for it to die. */ for (;;) { - int rc; + int rc; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); if (!worker->proc || worker->generation != generation) @@ -509,7 +510,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) void logicalrep_worker_wakeup(Oid subid, Oid relid) { - LogicalRepWorker *worker; + LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(subid, relid, true); @@ -544,18 +545,18 @@ logicalrep_worker_attach(int slot) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication worker slot %d is empty, cannot attach", - slot))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is empty, cannot attach", + slot))); } if (MyLogicalRepWorker->proc) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication worker slot %d is already used by " - "another worker, cannot attach", slot))); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is already used by " + "another worker, cannot attach", slot))); } MyLogicalRepWorker->proc = MyProc; @@ -620,7 +621,7 @@ logicalrep_worker_onexit(int code, Datum arg) void logicalrep_worker_sigterm(SIGNAL_ARGS) { - int save_errno = errno; + int save_errno = errno; got_SIGTERM = true; @@ -634,7 +635,7 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) void logicalrep_worker_sighup(SIGNAL_ARGS) { - int save_errno = errno; + int save_errno = errno; got_SIGHUP = true; @@ -651,15 +652,16 @@ logicalrep_worker_sighup(SIGNAL_ARGS) int logicalrep_sync_worker_count(Oid subid) { - int i; - int res = 0; + int i; + int res = 0; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + if (w->subid == subid && OidIsValid(w->relid)) res++; } @@ -699,7 +701,7 @@ ApplyLauncherRegister(void) return; memset(&bgw, 0, sizeof(bgw)); - bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); @@ -729,7 +731,7 @@ ApplyLauncherShmemInit(void) if (!found) { - int slot; + int slot; memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); @@ -783,7 +785,7 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; + TimestampTz last_start_time = 0; ereport(DEBUG1, (errmsg("logical replication launcher started"))); @@ -813,10 +815,10 @@ ApplyLauncherMain(Datum main_arg) int rc; List *sublist; ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; - TimestampTz now; - long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + MemoryContext subctx; + MemoryContext oldctx; + TimestampTz now; + long wait_time = DEFAULT_NAPTIME_PER_CYCLE; now = GetCurrentTimestamp(); @@ -826,7 +828,7 @@ ApplyLauncherMain(Datum main_arg) { /* Use temporary context for the database list and worker info. */ subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", + "Logical Replication Launcher sublist", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -838,8 +840,8 @@ ApplyLauncherMain(Datum main_arg) /* Start the missing workers for enabled subscriptions. */ foreach(lc, sublist) { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); @@ -864,9 +866,9 @@ ApplyLauncherMain(Datum main_arg) { /* * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, - * this usually means crash of the worker, so we should retry - * in wal_retrieve_retry_interval again. + * wal_retrieve_retry_interval since last worker was started, this + * usually means crash of the worker, so we should retry in + * wal_retrieve_retry_interval again. */ wait_time = wal_retrieve_retry_interval; } @@ -948,7 +950,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) Datum values[PG_STAT_GET_SUBSCRIPTION_COLS]; bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS]; int worker_pid; - LogicalRepWorker worker; + LogicalRepWorker worker; memcpy(&worker, &LogicalRepCtx->workers[i], sizeof(LogicalRepWorker)); @@ -992,7 +994,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); - /* If only a single subscription was requested, and we found it, break. */ + /* + * If only a single subscription was requested, and we found it, + * break. + */ if (OidIsValid(subid)) break; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7409e5ce3d..33cb01b8d0 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -118,7 +118,7 @@ StartupDecodingContext(List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress) { ReplicationSlot *slot; MemoryContext context, @@ -202,8 +202,8 @@ StartupDecodingContext(List *output_plugin_options, * plugin contains the name of the output plugin * output_plugin_options contains options passed to the output plugin * read_page, prepare_write, do_write, update_progress - * callbacks that have to be filled to perform the use-case dependent, - * actual, work. + * callbacks that have to be filled to perform the use-case dependent, + * actual, work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -219,7 +219,7 @@ CreateInitDecodingContext(char *plugin, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 27164de093..ba4d8cc5a4 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -328,17 +328,19 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) { LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); + /* * If only the confirmed_flush_lsn has changed the slot won't get - * marked as dirty by the above. Callers on the walsender interface - * are expected to keep track of their own progress and don't need - * it written out. But SQL-interface users cannot specify their own - * start positions and it's harder for them to keep track of their - * progress, so we should make more of an effort to save it for them. + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. * - * Dirty the slot so it's written out at the next checkpoint. We'll - * still lose its position on crash, as documented, but it's better - * than always losing the position even on clean restart. + * Dirty the slot so it's written out at the next checkpoint. + * We'll still lose its position on crash, as documented, but it's + * better than always losing the position even on clean restart. */ ReplicationSlotMarkDirty(); } diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index adc62a0f3b..ff348ff2a8 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -28,7 +28,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple); + HeapTuple tuple); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -72,7 +72,7 @@ void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - uint8 flags = 0; + uint8 flags = 0; pq_sendbyte(out, 'C'); /* sending COMMIT */ @@ -92,7 +92,7 @@ void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) { /* read flags (unused for now) */ - uint8 flags = pq_getmsgbyte(in); + uint8 flags = pq_getmsgbyte(in); if (flags != 0) elog(ERROR, "unrecognized flags %u in commit message", flags); @@ -136,7 +136,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -160,7 +160,7 @@ LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) { char action; - LogicalRepRelId relid; + LogicalRepRelId relid; /* read the relation id */ relid = pq_getmsgint(in, 4); @@ -180,7 +180,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) + HeapTuple newtuple) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -194,9 +194,9 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, if (oldtuple != NULL) { if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - pq_sendbyte(out, 'O'); /* old tuple follows */ + pq_sendbyte(out, 'O'); /* old tuple follows */ else - pq_sendbyte(out, 'K'); /* old key follows */ + pq_sendbyte(out, 'K'); /* old key follows */ logicalrep_write_tuple(out, rel, oldtuple); } @@ -213,7 +213,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *newtup) { char action; - LogicalRepRelId relid; + LogicalRepRelId relid; /* read the relation id */ relid = pq_getmsgint(in, 4); @@ -277,7 +277,7 @@ LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) { char action; - LogicalRepRelId relid; + LogicalRepRelId relid; /* read the relation id */ relid = pq_getmsgint(in, 4); @@ -323,7 +323,7 @@ logicalrep_write_rel(StringInfo out, Relation rel) LogicalRepRelation * logicalrep_read_rel(StringInfo in) { - LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation)); + LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation)); rel->remoteid = pq_getmsgint(in, 4); @@ -424,12 +424,12 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) if (isnull[i]) { - pq_sendbyte(out, 'n'); /* null column */ + pq_sendbyte(out, 'n'); /* null column */ continue; } else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) { - pq_sendbyte(out, 'u'); /* unchanged toast column */ + pq_sendbyte(out, 'u'); /* unchanged toast column */ continue; } @@ -473,21 +473,21 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) switch (kind) { - case 'n': /* null */ + case 'n': /* null */ tuple->values[i] = NULL; tuple->changed[i] = true; break; - case 'u': /* unchanged column */ + case 'u': /* unchanged column */ /* we don't receive the value of an unchanged column */ tuple->values[i] = NULL; break; - case 't': /* text formatted value */ + case 't': /* text formatted value */ { int len; tuple->changed[i] = true; - len = pq_getmsgint(in, 4); /* read length */ + len = pq_getmsgint(in, 4); /* read length */ /* and data */ tuple->values[i] = palloc(len + 1); @@ -534,7 +534,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel) for (i = 0; i < desc->natts; i++) { Form_pg_attribute att = desc->attrs[i]; - uint8 flags = 0; + uint8 flags = 0; if (att->attisdropped) continue; @@ -612,7 +612,7 @@ logicalrep_write_namespace(StringInfo out, Oid nspid) pq_sendbyte(out, '\0'); else { - char *nspname = get_namespace_name(nspid); + char *nspname = get_namespace_name(nspid); if (nspname == NULL) elog(ERROR, "cache lookup failed for namespace %u", diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 590355a846..41eff8971a 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -30,13 +30,13 @@ #include "utils/memutils.h" #include "utils/syscache.h" -static MemoryContext LogicalRepRelMapContext = NULL; +static MemoryContext LogicalRepRelMapContext = NULL; -static HTAB *LogicalRepRelMap = NULL; -static HTAB *LogicalRepTypMap = NULL; +static HTAB *LogicalRepRelMap = NULL; +static HTAB *LogicalRepTypMap = NULL; static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, - uint32 hashvalue); + uint32 hashvalue); /* * Relcache invalidation callback for our relation map cache. @@ -44,7 +44,7 @@ static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, static void logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) { - LogicalRepRelMapEntry *entry; + LogicalRepRelMapEntry *entry; /* Just to be sure. */ if (LogicalRepRelMap == NULL) @@ -110,7 +110,7 @@ logicalrep_relmap_init(void) /* This will usually be small. */ LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl, - HASH_ELEM | HASH_BLOBS |HASH_CONTEXT); + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, @@ -134,7 +134,7 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (remoterel->natts > 0) { - int i; + int i; for (i = 0; i < remoterel->natts; i++) pfree(remoterel->attnames[i]); @@ -157,10 +157,10 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) void logicalrep_relmap_update(LogicalRepRelation *remoterel) { - MemoryContext oldctx; - LogicalRepRelMapEntry *entry; - bool found; - int i; + MemoryContext oldctx; + LogicalRepRelMapEntry *entry; + bool found; + int i; if (LogicalRepRelMap == NULL) logicalrep_relmap_init(); @@ -202,7 +202,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) static int logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) { - int i; + int i; for (i = 0; i < remoterel->natts; i++) { @@ -222,7 +222,7 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) { - LogicalRepRelMapEntry *entry; + LogicalRepRelMapEntry *entry; bool found; if (LogicalRepRelMap == NULL) @@ -245,7 +245,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) Bitmapset *idkey; TupleDesc desc; LogicalRepRelation *remoterel; - MemoryContext oldctx; + MemoryContext oldctx; + remoterel = &entry->remoterel; /* Try to find and lock the relation by name. */ @@ -265,8 +266,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) /* * Build the mapping of local attribute numbers to remote attribute - * numbers and validate that we don't miss any replicated columns - * as that would result in potentially unwanted data loss. + * numbers and validate that we don't miss any replicated columns as + * that would result in potentially unwanted data loss. */ desc = RelationGetDescr(entry->localrel); oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); @@ -276,8 +277,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) found = 0; for (i = 0; i < desc->natts; i++) { - int attnum = logicalrep_rel_att_by_name(remoterel, - NameStr(desc->attrs[i]->attname)); + int attnum = logicalrep_rel_att_by_name(remoterel, + NameStr(desc->attrs[i]->attname)); + entry->attrmap[i] = attnum; if (attnum >= 0) found++; @@ -287,9 +289,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) if (found < remoterel->natts) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication target relation \"%s.%s\" is missing " - "some replicated columns", - remoterel->nspname, remoterel->relname))); + errmsg("logical replication target relation \"%s.%s\" is missing " + "some replicated columns", + remoterel->nspname, remoterel->relname))); /* * Check that replica identity matches. We allow for stricter replica @@ -299,8 +301,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) * but in the opposite scenario it will. * * Don't throw any error here just mark the relation entry as not - * updatable, as replica identity is only for updates and deletes - * but inserts can be replicated even without it. + * updatable, as replica identity is only for updates and deletes but + * inserts can be replicated even without it. */ entry->updatable = true; idkey = RelationGetIndexAttrBitmap(entry->localrel, @@ -310,6 +312,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) { idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_PRIMARY_KEY); + /* * If no replica identity index and no PK, the published table * must have replica identity FULL. @@ -321,14 +324,14 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) i = -1; while ((i = bms_next_member(idkey, i)) >= 0) { - int attnum = i + FirstLowInvalidHeapAttributeNumber; + int attnum = i + FirstLowInvalidHeapAttributeNumber; if (!AttrNumberIsForUserDefinedAttr(attnum)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication target relation \"%s.%s\" uses " - "system columns in REPLICA IDENTITY index", - remoterel->nspname, remoterel->relname))); + errmsg("logical replication target relation \"%s.%s\" uses " + "system columns in REPLICA IDENTITY index", + remoterel->nspname, remoterel->relname))); attnum = AttrNumberGetAttrOffset(attnum); @@ -371,7 +374,7 @@ static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue) { HASH_SEQ_STATUS status; - LogicalRepTyp *entry; + LogicalRepTyp *entry; /* Just to be sure. */ if (LogicalRepTypMap == NULL) @@ -402,9 +405,9 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry) void logicalrep_typmap_update(LogicalRepTyp *remotetyp) { - MemoryContext oldctx; - LogicalRepTyp *entry; - bool found; + MemoryContext oldctx; + LogicalRepTyp *entry; + bool found; if (LogicalRepTypMap == NULL) logicalrep_relmap_init(); @@ -433,9 +436,9 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp) Oid logicalrep_typmap_getid(Oid remoteid) { - LogicalRepTyp *entry; - bool found; - Oid nspoid; + LogicalRepTyp *entry; + bool found; + Oid nspoid; /* Internal types are mapped directly. */ if (remoteid < FirstNormalObjectId) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 428d7aa55e..8848f5b4ec 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -59,7 +59,7 @@ * by the following graph describing the SnapBuild->state transitions: * * +-------------------------+ - * +----| START |-------------+ + * +----| START |-------------+ * | +-------------------------+ | * | | | * | | | @@ -68,22 +68,22 @@ * | | | * | v | * | +-------------------------+ v - * | | BUILDING_SNAPSHOT |------------>| + * | | BUILDING_SNAPSHOT |------------>| * | +-------------------------+ | * | | | * | | | - * | running_xacts #2, xacts from #1 finished | + * | running_xacts #2, xacts from #1 finished | * | | | * | | | * | v | * | +-------------------------+ v - * | | FULL_SNAPSHOT |------------>| + * | | FULL_SNAPSHOT |------------>| * | +-------------------------+ | * | | | * running_xacts | saved snapshot * with zero xacts | at running_xacts's lsn * | | | - * | running_xacts with xacts from #2 finished | + * | running_xacts with xacts from #2 finished | * | | | * | v | * | +-------------------------+ | @@ -209,9 +209,9 @@ struct SnapBuild TransactionId was_xmin; TransactionId was_xmax; - size_t was_xcnt; /* number of used xip entries */ - size_t was_xcnt_space; /* allocated size of xip */ - TransactionId *was_xip; /* running xacts array, xidComparator-sorted */ + size_t was_xcnt; /* number of used xip entries */ + size_t was_xcnt_space; /* allocated size of xip */ + TransactionId *was_xip; /* running xacts array, xidComparator-sorted */ } was_running; /* @@ -608,8 +608,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder) { if (newxcnt >= GetMaxSnapshotXidCount()) ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("initial slot snapshot too large"))); + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("initial slot snapshot too large"))); newxip[newxcnt++] = xid; } @@ -986,6 +986,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, if (NormalTransactionIdFollows(subxid, xmax)) xmax = subxid; } + /* * If we're forcing timetravel we also need visibility information * about subtransaction, so keep track of subtransaction's state, even @@ -1031,8 +1032,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* * Adjust xmax of the snapshot builder, we only do that for committed, - * catalog modifying, transactions, everything else isn't interesting - * for us since we'll never look at the respective rows. + * catalog modifying, transactions, everything else isn't interesting for + * us since we'll never look at the respective rows. */ if (needs_timetravel && (!TransactionIdIsValid(builder->xmax) || @@ -1130,8 +1131,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact running->oldestRunningXid); /* - * Increase shared memory limits, so vacuum can work on tuples we prevented - * from being pruned till now. + * Increase shared memory limits, so vacuum can work on tuples we + * prevented from being pruned till now. */ LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid); @@ -1202,11 +1203,11 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * modifying transactions. * * c) First incrementally build a snapshot for catalog tuples - * (BUILDING_SNAPSHOT), that requires all, already in-progress, - * transactions to finish. Every transaction starting after that - * (FULL_SNAPSHOT state), has enough information to be decoded. But - * for older running transactions no viable snapshot exists yet, so - * CONSISTENT will only be reached once all of those have finished. + * (BUILDING_SNAPSHOT), that requires all, already in-progress, + * transactions to finish. Every transaction starting after that + * (FULL_SNAPSHOT state), has enough information to be decoded. But + * for older running transactions no viable snapshot exists yet, so + * CONSISTENT will only be reached once all of those have finished. * --- */ @@ -1271,6 +1272,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn /* there won't be any state to cleanup */ return false; } + /* * c) transition from START to BUILDING_SNAPSHOT. * @@ -1308,6 +1310,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn SnapBuildWaitSnapshot(running, running->nextXid); } + /* * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT. * @@ -1324,13 +1327,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn SnapBuildStartNextPhaseAt(builder, running->nextXid); ereport(LOG, - (errmsg("logical decoding found initial consistent point at %X/%X", - (uint32) (lsn >> 32), (uint32) lsn), - errdetail("Waiting for transactions (approximately %d) older than %u to end.", - running->xcnt, running->nextXid))); + (errmsg("logical decoding found initial consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); SnapBuildWaitSnapshot(running, running->nextXid); } + /* * c) transition from FULL_SNAPSHOT to CONSISTENT. * @@ -1368,9 +1372,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * * This isn't required for the correctness of decoding, but to: * a) allow isolationtester to notice that we're currently waiting for - * something. + * something. * b) log a new xl_running_xacts record where it'd be helpful, without having - * to write for bgwriter or checkpointer. + * to write for bgwriter or checkpointer. * --- */ static void @@ -1383,9 +1387,9 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) TransactionId xid = running->xids[off]; /* - * Upper layers should prevent that we ever need to wait on - * ourselves. Check anyway, since failing to do so would either - * result in an endless wait or an Assert() failure. + * Upper layers should prevent that we ever need to wait on ourselves. + * Check anyway, since failing to do so would either result in an + * endless wait or an Assert() failure. */ if (TransactionIdIsCurrentTransactionId(xid)) elog(ERROR, "waiting for ourselves"); @@ -1864,8 +1868,9 @@ CheckPointSnapBuild(void) char path[MAXPGPATH + 21]; /* - * We start off with a minimum of the last redo pointer. No new replication - * slot will start before that, so that's a safe upper bound for removal. + * We start off with a minimum of the last redo pointer. No new + * replication slot will start before that, so that's a safe upper bound + * for removal. */ redo = GetRedoRecPtr(); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7e51076b37..1e3753b8fe 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -113,7 +113,8 @@ StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ -static void pg_attribute_noreturn() +static void +pg_attribute_noreturn() finish_sync_worker(void) { /* @@ -148,12 +149,12 @@ finish_sync_worker(void) static bool wait_for_sync_status_change(Oid relid, char origstate) { - int rc; - char state = origstate; + int rc; + char state = origstate; while (!got_SIGTERM) { - LogicalRepWorker *worker; + LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, @@ -269,7 +270,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) struct tablesync_start_time_mapping { Oid relid; - TimestampTz last_start_time; + TimestampTz last_start_time; }; static List *table_states = NIL; static HTAB *last_start_times = NULL; @@ -281,9 +282,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* We need up to date sync state info for subscription tables here. */ if (!table_states_valid) { - MemoryContext oldctx; - List *rstates; - ListCell *lc; + MemoryContext oldctx; + List *rstates; + ListCell *lc; SubscriptionRelState *rstate; /* Clean the old list. */ @@ -294,7 +295,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); @@ -324,6 +325,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = hash_create("Logical replication table sync worker start times", 256, &ctl, HASH_ELEM | HASH_BLOBS); } + /* * Clean up the hash table when we're done with all tables (just to * release the bit of memory). @@ -337,14 +339,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* Process all tables that are being synchronized. */ foreach(lc, table_states) { - SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc); + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); if (rstate->state == SUBREL_STATE_SYNCDONE) { /* - * Apply has caught up to the position where the table sync - * has finished. Time to mark the table as ready so that - * apply will just continue to replicate it normally. + * Apply has caught up to the position where the table sync has + * finished. Time to mark the table as ready so that apply will + * just continue to replicate it normally. */ if (current_lsn >= rstate->lsn) { @@ -362,8 +364,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } else { - LogicalRepWorker *syncworker; - int nsyncworkers = 0; + LogicalRepWorker *syncworker; + int nsyncworkers = 0; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, @@ -376,6 +378,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) SpinLockRelease(&syncworker->relmutex); } else + /* * If no sync worker for this table yet, count running sync * workers for this subscription, while we have the lock, for @@ -394,16 +397,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * There are three possible synchronization situations here. * * a) Apply is in front of the table sync: We tell the table - * sync to CATCHUP. + * sync to CATCHUP. * * b) Apply is behind the table sync: We tell the table sync - * to mark the table as SYNCDONE and finish. - + * to mark the table as SYNCDONE and finish. + * * c) Apply and table sync are at the same position: We tell - * table sync to mark the table as READY and finish. + * table sync to mark the table as READY and finish. * - * In any case we'll need to wait for table sync to change - * the state in catalog and only then continue ourselves. + * In any case we'll need to wait for table sync to change the + * state in catalog and only then continue ourselves. */ if (current_lsn > rstate->lsn) { @@ -427,20 +430,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) logicalrep_worker_wakeup_ptr(syncworker); /* - * Enter busy loop and wait for synchronization status - * change. + * Enter busy loop and wait for synchronization status change. */ wait_for_sync_status_change(rstate->relid, rstate->state); } /* - * If there is no sync worker registered for the table and - * there is some free sync worker slot, start new sync worker - * for the table. + * If there is no sync worker registered for the table and there + * is some free sync worker slot, start new sync worker for the + * table. */ else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) { - TimestampTz now = GetCurrentTimestamp(); + TimestampTz now = GetCurrentTimestamp(); struct tablesync_start_time_mapping *hentry; bool found; @@ -492,7 +494,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) for (i = 0; i < desc->natts; i++) { - int remoteattnum = rel->attrmap[i]; + int remoteattnum = rel->attrmap[i]; /* Skip dropped attributes. */ if (desc->attrs[i]->attisdropped) @@ -503,7 +505,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) continue; attnamelist = lappend(attnamelist, - makeString(rel->remoterel.attnames[remoteattnum])); + makeString(rel->remoterel.attnames[remoteattnum])); } return attnamelist; @@ -516,8 +518,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) static int copy_read_data(void *outbuf, int minread, int maxread) { - int bytesread = 0; - int avail; + int bytesread = 0; + int avail; /* If there are some leftover data from previous read, use them. */ avail = copybuf->len - copybuf->cursor; @@ -601,13 +603,13 @@ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel) { - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; - Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; - bool isnull; - int natt; + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {OIDOID, CHAROID}; + Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; + bool isnull; + int natt; lrel->nspname = nspname; lrel->relname = relname; @@ -615,14 +617,14 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" - " FROM pg_catalog.pg_class c" - " INNER JOIN pg_catalog.pg_namespace n" - " ON (c.relnamespace = n.oid)" - " WHERE n.nspname = %s" - " AND c.relname = %s" - " AND c.relkind = 'r'", - quote_literal_cstr(nspname), - quote_literal_cstr(relname)); + " FROM pg_catalog.pg_class c" + " INNER JOIN pg_catalog.pg_namespace n" + " ON (c.relnamespace = n.oid)" + " WHERE n.nspname = %s" + " AND c.relname = %s" + " AND c.relkind = 'r'", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); res = walrcv_exec(wrconn, cmd.data, 2, tableRow); if (res->status != WALRCV_OK_TUPLES) @@ -653,7 +655,7 @@ fetch_remote_table_info(char *nspname, char *relname, " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" - " ON (i.indexrelid = pg_get_replica_identity_index(%u))" + " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" " AND NOT a.attisdropped" " AND a.attrelid = %u" @@ -686,7 +688,7 @@ fetch_remote_table_info(char *nspname, char *relname, /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", - nspname, relname); + nspname, relname); ExecClearTuple(slot); } @@ -707,9 +709,9 @@ static void copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; - LogicalRepRelation lrel; - WalRcvExecResult *res; - StringInfoData cmd; + LogicalRepRelation lrel; + WalRcvExecResult *res; + StringInfoData cmd; CopyState cstate; List *attnamelist; ParseState *pstate; @@ -759,8 +761,8 @@ copy_table(Relation rel) char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { - char *slotname; - char *err; + char *slotname; + char *err; char relstate; XLogRecPtr relstate_lsn; @@ -783,7 +785,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * NAMEDATALEN on the remote that matters, but this scheme will also work * reasonably if that is different.) */ - StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ + StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ slotname = psprintf("%.*s_%u_sync_%u", NAMEDATALEN - 28, MySubscription->slotname, @@ -801,7 +803,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_DATASYNC: { Relation rel; - WalRcvExecResult *res; + WalRcvExecResult *res; SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -818,24 +820,23 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) pgstat_report_stat(false); /* - * We want to do the table data sync in single - * transaction. + * We want to do the table data sync in single transaction. */ StartTransactionCommand(); /* * Use standard write lock here. It might be better to - * disallow access to table while it's being synchronized. - * But we don't want to block the main apply process from - * working and it has to open relation in RowExclusiveLock - * when remapping remote relation id to local one. + * disallow access to table while it's being synchronized. But + * we don't want to block the main apply process from working + * and it has to open relation in RowExclusiveLock when + * remapping remote relation id to local one. */ rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); /* - * Create temporary slot for the sync process. - * We do this inside transaction so that we can use the - * snapshot made by the slot to get existing data. + * Create temporary slot for the sync process. We do this + * inside transaction so that we can use the snapshot made by + * the slot to get existing data. */ res = walrcv_exec(wrconn, "BEGIN READ ONLY ISOLATION LEVEL " @@ -849,10 +850,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* * Create new temporary logical decoding slot. * - * We'll use slot for data copy so make sure the snapshot - * is used for the transaction, that way the COPY will get - * data that is consistent with the lsn used by the slot - * to start decoding. + * We'll use slot for data copy so make sure the snapshot is + * used for the transaction, that way the COPY will get data + * that is consistent with the lsn used by the slot to start + * decoding. */ walrcv_create_slot(wrconn, slotname, true, CRS_USE_SNAPSHOT, origin_startpos); @@ -872,8 +873,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommandCounterIncrement(); /* - * We are done with the initial data synchronization, - * update the state. + * We are done with the initial data synchronization, update + * the state. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; @@ -881,8 +882,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SpinLockRelease(&MyLogicalRepWorker->relmutex); /* - * Wait for main apply worker to either tell us to - * catchup or that we are done. + * Wait for main apply worker to either tell us to catchup or + * that we are done. */ wait_for_sync_status_change(MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 04813b506e..9d1eab9e1e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -88,29 +88,29 @@ typedef struct FlushPosition { - dlist_node node; - XLogRecPtr local_end; - XLogRecPtr remote_end; + dlist_node node; + XLogRecPtr local_end; + XLogRecPtr remote_end; } FlushPosition; static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; + LogicalRepRelation *rel; int attnum; } SlotErrCallbackArg; -static MemoryContext ApplyMessageContext = NULL; -MemoryContext ApplyContext = NULL; +static MemoryContext ApplyMessageContext = NULL; +MemoryContext ApplyContext = NULL; -WalReceiverConn *wrconn = NULL; +WalReceiverConn *wrconn = NULL; -Subscription *MySubscription = NULL; -bool MySubscriptionValid = false; +Subscription *MySubscription = NULL; +bool MySubscriptionValid = false; -bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +bool in_remote_transaction = false; +static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -215,7 +215,7 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) */ static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, - TupleTableSlot *slot) + TupleTableSlot *slot) { TupleDesc desc = RelationGetDescr(rel->localrel); int num_phys_attrs = desc->natts; @@ -271,9 +271,9 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, static void slot_store_error_callback(void *arg) { - SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; - Oid remotetypoid, - localtypoid; + SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; + Oid remotetypoid, + localtypoid; if (errarg->attnum < 0) return; @@ -295,12 +295,12 @@ slot_store_error_callback(void *arg) */ static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) + char **values) { - int natts = slot->tts_tupleDescriptor->natts; - int i; - SlotErrCallbackArg errarg; - ErrorContextCallback errcallback; + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; ExecClearTuple(slot); @@ -315,14 +315,14 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, /* Call the "in" function for each non-dropped attribute */ for (i = 0; i < natts; i++) { - Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; - int remoteattnum = rel->attrmap[i]; + Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; + int remoteattnum = rel->attrmap[i]; if (!att->attisdropped && remoteattnum >= 0 && values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; + Oid typinput; + Oid typioparam; errarg.attnum = remoteattnum; @@ -359,12 +359,12 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, */ static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values, bool *replaces) + char **values, bool *replaces) { - int natts = slot->tts_tupleDescriptor->natts; - int i; - SlotErrCallbackArg errarg; - ErrorContextCallback errcallback; + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; slot_getallattrs(slot); ExecClearTuple(slot); @@ -380,16 +380,16 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, /* Call the "in" function for each replaced attribute */ for (i = 0; i < natts; i++) { - Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; - int remoteattnum = rel->attrmap[i]; + Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; + int remoteattnum = rel->attrmap[i]; if (remoteattnum >= 0 && !replaces[remoteattnum]) continue; if (remoteattnum >= 0 && values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; + Oid typinput; + Oid typioparam; errarg.attnum = remoteattnum; @@ -418,7 +418,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, static void apply_handle_begin(StringInfo s) { - LogicalRepBeginData begin_data; + LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); @@ -437,7 +437,7 @@ apply_handle_begin(StringInfo s) static void apply_handle_commit(StringInfo s) { - LogicalRepCommitData commit_data; + LogicalRepCommitData commit_data; logicalrep_read_commit(s, &commit_data); @@ -476,8 +476,8 @@ static void apply_handle_origin(StringInfo s) { /* - * ORIGIN message can only come inside remote transaction and before - * any actual writes. + * ORIGIN message can only come inside remote transaction and before any + * actual writes. */ if (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker())) @@ -497,7 +497,7 @@ apply_handle_origin(StringInfo s) static void apply_handle_relation(StringInfo s) { - LogicalRepRelation *rel; + LogicalRepRelation *rel; rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); @@ -512,7 +512,7 @@ apply_handle_relation(StringInfo s) static void apply_handle_type(StringInfo s) { - LogicalRepTyp typ; + LogicalRepTyp typ; logicalrep_read_typ(s, &typ); logicalrep_typmap_update(&typ); @@ -526,7 +526,7 @@ apply_handle_type(StringInfo s) static Oid GetRelationIdentityOrPK(Relation rel) { - Oid idxoid; + Oid idxoid; idxoid = RelationGetReplicaIndex(rel); @@ -543,11 +543,11 @@ static void apply_handle_insert(StringInfo s) { LogicalRepRelMapEntry *rel; - LogicalRepTupleData newtup; - LogicalRepRelId relid; - EState *estate; - TupleTableSlot *remoteslot; - MemoryContext oldctx; + LogicalRepTupleData newtup; + LogicalRepRelId relid; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; ensure_transaction(); @@ -607,15 +607,15 @@ check_relation_updatable(LogicalRepRelMapEntry *rel) return; /* - * We are in error mode so it's fine this is somewhat slow. - * It's better to give user correct error. + * We are in error mode so it's fine this is somewhat slow. It's better to + * give user correct error. */ if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publisher does not send replica identity column " - "expected by the logical replication target relation \"%s.%s\"", + "expected by the logical replication target relation \"%s.%s\"", rel->remoterel.nspname, rel->remoterel.relname))); } @@ -637,17 +637,17 @@ static void apply_handle_update(StringInfo s) { LogicalRepRelMapEntry *rel; - LogicalRepRelId relid; - Oid idxoid; - EState *estate; - EPQState epqstate; - LogicalRepTupleData oldtup; - LogicalRepTupleData newtup; - bool has_oldtup; - TupleTableSlot *localslot; - TupleTableSlot *remoteslot; - bool found; - MemoryContext oldctx; + LogicalRepRelId relid; + Oid idxoid; + EState *estate; + EPQState epqstate; + LogicalRepTupleData oldtup; + LogicalRepTupleData newtup; + bool has_oldtup; + TupleTableSlot *localslot; + TupleTableSlot *remoteslot; + bool found; + MemoryContext oldctx; ensure_transaction(); @@ -685,8 +685,8 @@ apply_handle_update(StringInfo s) MemoryContextSwitchTo(oldctx); /* - * Try to find tuple using either replica identity index, primary key - * or if needed, sequential scan. + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. */ idxoid = GetRelationIdentityOrPK(rel->localrel); Assert(OidIsValid(idxoid) || @@ -758,15 +758,15 @@ static void apply_handle_delete(StringInfo s) { LogicalRepRelMapEntry *rel; - LogicalRepTupleData oldtup; - LogicalRepRelId relid; - Oid idxoid; - EState *estate; - EPQState epqstate; - TupleTableSlot *remoteslot; - TupleTableSlot *localslot; - bool found; - MemoryContext oldctx; + LogicalRepTupleData oldtup; + LogicalRepRelId relid; + Oid idxoid; + EState *estate; + EPQState epqstate; + TupleTableSlot *remoteslot; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; ensure_transaction(); @@ -802,8 +802,8 @@ apply_handle_delete(StringInfo s) MemoryContextSwitchTo(oldctx); /* - * Try to find tuple using either replica identity index, primary key - * or if needed, sequential scan. + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. */ idxoid = GetRelationIdentityOrPK(rel->localrel); Assert(OidIsValid(idxoid) || @@ -826,7 +826,7 @@ apply_handle_delete(StringInfo s) } else { - /* The tuple to be deleted could not be found.*/ + /* The tuple to be deleted could not be found. */ ereport(DEBUG1, (errmsg("logical replication could not find row for delete " "in replication target %s", @@ -856,46 +856,46 @@ apply_handle_delete(StringInfo s) static void apply_dispatch(StringInfo s) { - char action = pq_getmsgbyte(s); + char action = pq_getmsgbyte(s); switch (action) { - /* BEGIN */ + /* BEGIN */ case 'B': apply_handle_begin(s); break; - /* COMMIT */ + /* COMMIT */ case 'C': apply_handle_commit(s); break; - /* INSERT */ + /* INSERT */ case 'I': apply_handle_insert(s); break; - /* UPDATE */ + /* UPDATE */ case 'U': apply_handle_update(s); break; - /* DELETE */ + /* DELETE */ case 'D': apply_handle_delete(s); break; - /* RELATION */ + /* RELATION */ case 'R': apply_handle_relation(s); break; - /* TYPE */ + /* TYPE */ case 'Y': apply_handle_type(s); break; - /* ORIGIN */ + /* ORIGIN */ case 'O': apply_handle_origin(s); break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid logical replication message type %c", action))); + errmsg("invalid logical replication message type %c", action))); } } @@ -925,7 +925,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, dlist_foreach_modify(iter, &lsn_mapping) { FlushPosition *pos = - dlist_container(FlushPosition, node, iter.cur); + dlist_container(FlushPosition, node, iter.cur); *write = pos->remote_end; @@ -995,12 +995,12 @@ static void LogicalRepApplyLoop(XLogRecPtr last_received) { /* - * Init the ApplyMessageContext which we clean up after each - * replication protocol message. + * Init the ApplyMessageContext which we clean up after each replication + * protocol message. */ ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1039,7 +1039,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } else { - int c; + int c; StringInfoData s; /* Reset timeout. */ @@ -1108,7 +1108,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { /* * If we didn't get any transactions for a while there might be - * unconsumed invalidation messages in the queue, consume them now. + * unconsumed invalidation messages in the queue, consume them + * now. */ AcceptInvalidationMessages(); if (!MySubscriptionValid) @@ -1126,6 +1127,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (endofstream) { TimeLineID tli; + walrcv_endstreaming(wrconn, &tli); break; } @@ -1152,19 +1154,18 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (rc & WL_TIMEOUT) { /* - * We didn't receive anything new. If we haven't heard - * anything from the server for more than - * wal_receiver_timeout / 2, ping the server. Also, if - * it's been longer than wal_receiver_status_interval - * since the last update we sent, send a status update to - * the master anyway, to report any progress in applying - * WAL. + * We didn't receive anything new. If we haven't heard anything + * from the server for more than wal_receiver_timeout / 2, ping + * the server. Also, if it's been longer than + * wal_receiver_status_interval since the last update we sent, + * send a status update to the master anyway, to report any + * progress in applying WAL. */ bool requestReply = false; /* - * Check if time since last receive from standby has - * reached the configured limit. + * Check if time since last receive from standby has reached the + * configured limit. */ if (wal_receiver_timeout > 0) { @@ -1180,13 +1181,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) (errmsg("terminating logical replication worker due to timeout"))); /* - * We didn't receive anything new, for half of - * receiver replication timeout. Ping the server. + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. */ if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); + (wal_receiver_timeout / 2)); if (now >= timeout) { requestReply = true; @@ -1211,17 +1212,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received) static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { - static StringInfo reply_message = NULL; - static TimestampTz send_time = 0; + static StringInfo reply_message = NULL; + static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; static XLogRecPtr last_flushpos = InvalidXLogRecPtr; - XLogRecPtr writepos; - XLogRecPtr flushpos; + XLogRecPtr writepos; + XLogRecPtr flushpos; TimestampTz now; - bool have_pending_txes; + bool have_pending_txes; /* * If the user doesn't want status to be reported to the publisher, be @@ -1237,8 +1238,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) get_flush_position(&writepos, &flushpos, &have_pending_txes); /* - * No outstanding transactions to flush, we can report the latest - * received position. This is important for synchronous replication. + * No outstanding transactions to flush, we can report the latest received + * position. This is important for synchronous replication. */ if (!have_pending_txes) flushpos = writepos = recvpos; @@ -1262,7 +1263,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!reply_message) { - MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + reply_message = makeStringInfo(); MemoryContextSwitchTo(oldctx); } @@ -1273,7 +1275,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) pq_sendint64(reply_message, recvpos); /* write */ pq_sendint64(reply_message, flushpos); /* flush */ pq_sendint64(reply_message, writepos); /* apply */ - pq_sendint64(reply_message, now); /* sendTime */ + pq_sendint64(reply_message, now); /* sendTime */ pq_sendbyte(reply_message, requestReply); /* replyRequested */ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", @@ -1300,9 +1302,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static void reread_subscription(void) { - MemoryContext oldctx; - Subscription *newsub; - bool started_tx = false; + MemoryContext oldctx; + Subscription *newsub; + bool started_tx = false; /* This function might be called inside or outside of transaction. */ if (!IsTransactionState()) @@ -1317,47 +1319,45 @@ reread_subscription(void) newsub = GetSubscription(MyLogicalRepWorker->subid, true); /* - * Exit if the subscription was removed. - * This normally should not happen as the worker gets killed - * during DROP SUBSCRIPTION. + * Exit if the subscription was removed. This normally should not happen + * as the worker gets killed during DROP SUBSCRIPTION. */ if (!newsub) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was removed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); } /* - * Exit if the subscription was disabled. - * This normally should not happen as the worker gets killed - * during ALTER SUBSCRIPTION ... DISABLE. + * Exit if the subscription was disabled. This normally should not happen + * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE. */ if (!newsub->enabled) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); } /* - * Exit if connection string was changed. The launcher will start - * new worker. + * Exit if connection string was changed. The launcher will start new + * worker. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because the connection information was changed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the connection information was changed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); @@ -1370,9 +1370,9 @@ reread_subscription(void) if (strcmp(newsub->name, MySubscription->name) != 0) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because subscription was renamed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription was renamed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); @@ -1382,30 +1382,30 @@ reread_subscription(void) Assert(newsub->slotname); /* - * We need to make new connection to new slot if slot name has changed - * so exit here as well if that's the case. + * We need to make new connection to new slot if slot name has changed so + * exit here as well if that's the case. */ if (strcmp(newsub->slotname, MySubscription->slotname) != 0) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because the replication slot name was changed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the replication slot name was changed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); } /* - * Exit if publication list was changed. The launcher will start - * new worker. + * Exit if publication list was changed. The launcher will start new + * worker. */ if (!equal(newsub->publications, MySubscription->publications)) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because subscription's publications were changed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription's publications were changed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); @@ -1448,11 +1448,11 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) void ApplyWorkerMain(Datum main_arg) { - int worker_slot = DatumGetInt32(main_arg); - MemoryContext oldctx; - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos; - char *myslotname; + int worker_slot = DatumGetInt32(main_arg); + MemoryContext oldctx; + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos; + char *myslotname; WalRcvStreamOptions options; /* Attach to slot */ @@ -1488,8 +1488,8 @@ ApplyWorkerMain(Datum main_arg) /* Load the subscription into persistent memory context. */ ApplyContext = AllocSetContextCreate(TopMemoryContext, - "ApplyContext", - ALLOCSET_DEFAULT_SIZES); + "ApplyContext", + ALLOCSET_DEFAULT_SIZES); StartTransactionCommand(); oldctx = MemoryContextSwitchTo(ApplyContext); MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); @@ -1503,9 +1503,9 @@ ApplyWorkerMain(Datum main_arg) if (!MySubscription->enabled) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will not " - "start because the subscription was disabled during startup", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will not " + "start because the subscription was disabled during startup", + MySubscription->name))); proc_exit(0); } @@ -1530,7 +1530,7 @@ ApplyWorkerMain(Datum main_arg) if (am_tablesync_worker()) { - char *syncslotname; + char *syncslotname; /* This is table synchroniation worker, call initial sync. */ syncslotname = LogicalRepSyncTableStart(&origin_startpos); @@ -1545,10 +1545,10 @@ ApplyWorkerMain(Datum main_arg) else { /* This is main apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - int server_version; + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + int server_version; myslotname = MySubscription->slotname; @@ -1570,9 +1570,8 @@ ApplyWorkerMain(Datum main_arg) (errmsg("could not connect to the publisher: %s", err))); /* - * We don't really use the output identify_system for anything - * but it does some initializations on the upstream so let's still - * call it. + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. */ (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version); @@ -1580,8 +1579,8 @@ ApplyWorkerMain(Datum main_arg) } /* - * Setup callback for syscache so that we know when something - * changes in the subscription relation state. + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 694f351dd8..5bdfa60ae7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -29,31 +29,31 @@ PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); -static void pgoutput_startup(LogicalDecodingContext * ctx, - OutputPluginOptions *opt, bool is_init); -static void pgoutput_shutdown(LogicalDecodingContext * ctx); +static void pgoutput_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init); +static void pgoutput_shutdown(LogicalDecodingContext *ctx); static void pgoutput_begin_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn); + ReorderBufferTXN *txn); static void pgoutput_commit_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr commit_lsn); + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pgoutput_change(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, Relation rel, - ReorderBufferChange *change); + ReorderBufferTXN *txn, Relation rel, + ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, - RepOriginId origin_id); + RepOriginId origin_id); static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, - uint32 hashvalue); + uint32 hashvalue); /* Entry in the map used to remember which relation schemas we sent. */ typedef struct RelationSyncEntry { - Oid relid; /* relation oid */ - bool schema_sent; /* did we send the schema? */ - bool replicate_valid; + Oid relid; /* relation oid */ + bool schema_sent; /* did we send the schema? */ + bool replicate_valid; PublicationActions pubactions; } RelationSyncEntry; @@ -64,7 +64,7 @@ static void init_rel_sync_cache(MemoryContext decoding_context); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, - uint32 hashvalue); + uint32 hashvalue); /* * Specify output plugin callbacks @@ -130,9 +130,9 @@ parse_output_parameters(List *options, uint32 *protocol_version, if (!SplitIdentifierString(strVal(defel->arg), ',', publication_names)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("invalid publication_names syntax"))); + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("invalid publication_names syntax"))); } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); @@ -143,14 +143,14 @@ parse_output_parameters(List *options, uint32 *protocol_version, * Initialize this plugin */ static void -pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, - bool is_init) +pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init) { - PGOutputData *data = palloc0(sizeof(PGOutputData)); + PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ data->context = AllocSetContextCreate(ctx->context, - "logical replication output context", + "logical replication output context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -175,15 +175,15 @@ pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, /* Check if we support requested protocol */ if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM) ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("client sent proto_version=%d but we only support protocol %d or lower", + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("client sent proto_version=%d but we only support protocol %d or lower", data->protocol_version, LOGICALREP_PROTO_VERSION_NUM))); if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("client sent proto_version=%d but we only support protocol %d or higher", - data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM))); + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("client sent proto_version=%d but we only support protocol %d or higher", + data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM))); if (list_length(data->publication_names) < 1) ereport(ERROR, @@ -208,14 +208,14 @@ pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); if (send_replication_origin) { - char *origin; + char *origin; /* Message boundary */ OutputPluginWrite(ctx, false); @@ -225,10 +225,10 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) * XXX: which behaviour do we want here? * * Alternatives: - * - don't send origin message if origin name not found - * (that's what we do now) - * - throw error - that will break replication, not good - * - send some special "unknown" origin + * - don't send origin message if origin name not found + * (that's what we do now) + * - throw error - that will break replication, not good + * - send some special "unknown" origin *---------- */ if (replorigin_by_oid(txn->origin_id, true, &origin)) @@ -243,7 +243,7 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) */ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn) { OutputPluginUpdateProgress(ctx); @@ -259,9 +259,9 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { - PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; - MemoryContext old; - RelationSyncEntry *relentry; + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); @@ -333,8 +333,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; case REORDER_BUFFER_CHANGE_UPDATE: { - HeapTuple oldtuple = change->data.tp.oldtuple ? - &change->data.tp.oldtuple->tuple : NULL; + HeapTuple oldtuple = change->data.tp.oldtuple ? + &change->data.tp.oldtuple->tuple : NULL; OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, relation, oldtuple, @@ -367,7 +367,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, - RepOriginId origin_id) + RepOriginId origin_id) { return false; } @@ -379,7 +379,7 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, * of the ctx->context so it will be cleaned up by logical decoding machinery. */ static void -pgoutput_shutdown(LogicalDecodingContext * ctx) +pgoutput_shutdown(LogicalDecodingContext *ctx) { if (RelationSyncCache) { @@ -397,10 +397,10 @@ LoadPublications(List *pubnames) List *result = NIL; ListCell *lc; - foreach (lc, pubnames) + foreach(lc, pubnames) { - char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + char *pubname = (char *) lfirst(lc); + Publication *pub = GetPublicationByName(pubname, false); result = lappend(result, pub); } @@ -417,9 +417,8 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) publications_valid = false; /* - * Also invalidate per-relation cache so that next time the filtering - * info is checked it will be updated with the new publication - * settings. + * Also invalidate per-relation cache so that next time the filtering info + * is checked it will be updated with the new publication settings. */ rel_sync_cache_publication_cb(arg, cacheid, hashvalue); } @@ -434,7 +433,7 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) static void init_rel_sync_cache(MemoryContext cachectx) { - HASHCTL ctl; + HASHCTL ctl; MemoryContext old_ctxt; if (RelationSyncCache != NULL) @@ -466,9 +465,9 @@ init_rel_sync_cache(MemoryContext cachectx) static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid) { - RelationSyncEntry *entry; - bool found; - MemoryContext oldctx; + RelationSyncEntry *entry; + bool found; + MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -499,9 +498,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } /* - * Build publication cache. We can't use one provided by relcache - * as relcache considers all publications given relation is in, but - * here we only need to consider ones that the subscriber requested. + * Build publication cache. We can't use one provided by relcache as + * relcache considers all publications given relation is in, but here + * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = false; @@ -539,7 +538,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) static void rel_sync_cache_relation_cb(Datum arg, Oid relid) { - RelationSyncEntry *entry; + RelationSyncEntry *entry; /* * We can get here if the plugin was used in SQL interface as the @@ -558,15 +557,14 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * safe point. * * Getting invalidations for relations that aren't in the table is - * entirely normal, since there's no way to unregister for an - * invalidation event. So we don't care if it's found or not. + * entirely normal, since there's no way to unregister for an invalidation + * event. So we don't care if it's found or not. */ entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid, HASH_FIND, NULL); /* - * Reset schema sent status as the relation definition may have - * changed. + * Reset schema sent status as the relation definition may have changed. */ if (entry != NULL) entry->schema_sent = false; @@ -578,8 +576,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) { - HASH_SEQ_STATUS status; - RelationSyncEntry *entry; + HASH_SEQ_STATUS status; + RelationSyncEntry *entry; /* * We can get here if the plugin was used in SQL interface as the @@ -590,8 +588,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) return; /* - * There is no way to find which entry in our cache the hash belongs to - * so mark the whole cache as invalid. + * There is no way to find which entry in our cache the hash belongs to so + * mark the whole cache as invalid. */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5f63d0484a..5386e86aa6 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -502,8 +502,8 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) /* * Rename the slot directory on disk, so that we'll no longer recognize * this as a valid slot. Note that if this fails, we've got to mark the - * slot inactive before bailing out. If we're dropping an ephemeral or - * a temporary slot, we better never fail hard as the caller won't expect + * slot inactive before bailing out. If we're dropping an ephemeral or a + * temporary slot, we better never fail hard as the caller won't expect * the slot to survive and this might get called during error handling. */ if (rename(path, tmppath) == 0) @@ -839,8 +839,8 @@ restart: for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s; - char *slotname; - int active_pid; + char *slotname; + int active_pid; s = &ReplicationSlotCtl->replication_slots[i]; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 56a9ca9651..bbd26f3d6a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -119,11 +119,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* * Acquire a logical decoding slot, this will check for conflicting names. - * Initially create persistent slot as ephemeral - that allows us to nicely - * handle errors during initialization because it'll get dropped if this - * transaction fails. We'll make it persistent at the end. - * Temporary slots can be created as temporary from beginning as they get - * dropped on error as well. + * Initially create persistent slot as ephemeral - that allows us to + * nicely handle errors during initialization because it'll get dropped if + * this transaction fails. We'll make it persistent at the end. Temporary + * slots can be created as temporary from beginning as they get dropped on + * error as well. */ ReplicationSlotCreate(NameStr(*name), true, temporary ? RS_TEMPORARY : RS_EPHEMERAL); @@ -132,7 +132,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * Create logical decoding context, to build the initial snapshot. */ ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, - false, /* do not build snapshot */ + false, /* do not build snapshot */ logical_read_local_xlog_page, NULL, NULL, NULL); @@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) Datum values[PG_GET_REPLICATION_SLOTS_COLS]; bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; - ReplicationSlotPersistency persistency; + ReplicationSlotPersistency persistency; TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 554f783209..ad213fc454 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -102,17 +102,17 @@ static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + bool *am_sync); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - List *sync_standbys); + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + List *sync_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + List *sync_standbys, uint8 nth); static int SyncRepGetStandbyPriority(void); static List *SyncRepGetSyncStandbysPriority(bool *am_sync); static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); @@ -455,7 +455,7 @@ SyncRepReleaseWaiters(void) if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ereport(LOG, (errmsg("standby \"%s\" is now a synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); + application_name, MyWalSnd->sync_standby_priority))); else ereport(LOG, (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", @@ -513,7 +513,7 @@ SyncRepReleaseWaiters(void) */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr, bool *am_sync) { List *sync_standbys; @@ -542,9 +542,9 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * oldest ones among sync standbys. In a quorum-based, they are the Nth * latest ones. * - * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions. - * But we use SyncRepGetOldestSyncRecPtr() for that calculation because - * it's a bit more efficient. + * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest + * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation + * because it's a bit more efficient. * * XXX If the numbers of current and requested sync standbys are the same, * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced @@ -572,15 +572,15 @@ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, List *sync_standbys) { - ListCell *cell; + ListCell *cell; /* - * Scan through all sync standbys and calculate the oldest - * Write, Flush and Apply positions. + * Scan through all sync standbys and calculate the oldest Write, Flush + * and Apply positions. */ - foreach (cell, sync_standbys) + foreach(cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; @@ -606,23 +606,23 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, */ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) + XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) { - ListCell *cell; - XLogRecPtr *write_array; - XLogRecPtr *flush_array; - XLogRecPtr *apply_array; - int len; - int i = 0; + ListCell *cell; + XLogRecPtr *write_array; + XLogRecPtr *flush_array; + XLogRecPtr *apply_array; + int len; + int i = 0; len = list_length(sync_standbys); write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - foreach (cell, sync_standbys) + foreach(cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; SpinLockAcquire(&walsnd->mutex); write_array[i] = walsnd->write; @@ -654,8 +654,8 @@ SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, static int cmp_lsn(const void *a, const void *b) { - XLogRecPtr lsn1 = *((const XLogRecPtr *) a); - XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + XLogRecPtr lsn1 = *((const XLogRecPtr *) a); + XLogRecPtr lsn2 = *((const XLogRecPtr *) b); if (lsn1 > lsn2) return -1; @@ -674,7 +674,7 @@ cmp_lsn(const void *a, const void *b) * sync standby. Otherwise it's set to false. */ List * -SyncRepGetSyncStandbys(bool *am_sync) +SyncRepGetSyncStandbys(bool *am_sync) { /* Set default result */ if (am_sync != NULL) @@ -702,8 +702,8 @@ SyncRepGetSyncStandbys(bool *am_sync) static List * SyncRepGetSyncStandbysQuorum(bool *am_sync) { - List *result = NIL; - int i; + List *result = NIL; + int i; volatile WalSnd *walsnd; /* Use volatile pointer to prevent code * rearrangement */ @@ -730,8 +730,8 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* - * Consider this standby as a candidate for quorum sync standbys - * and append it to the result. + * Consider this standby as a candidate for quorum sync standbys and + * append it to the result. */ result = lappend_int(result, i); if (am_sync != NULL && walsnd == MyWalSnd) @@ -955,8 +955,8 @@ SyncRepGetStandbyPriority(void) return 0; /* - * In quorum-based sync replication, all the standbys in the list - * have the same priority, one. + * In quorum-based sync replication, all the standbys in the list have the + * same priority, one. */ return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 028170c952..2723612718 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1176,9 +1176,12 @@ XLogWalRcvSendHSFeedback(bool immed) { TimestampTz now; TransactionId nextXid; - uint32 xmin_epoch, catalog_xmin_epoch; - TransactionId xmin, catalog_xmin; + uint32 xmin_epoch, + catalog_xmin_epoch; + TransactionId xmin, + catalog_xmin; static TimestampTz sendTime = 0; + /* initially true so we always send at least one feedback message */ static bool master_has_standby_xmin = true; @@ -1211,8 +1214,8 @@ XLogWalRcvSendHSFeedback(bool immed) * * Bailing out here also ensures that we don't send feedback until we've * read our own replication slot state, so we don't tell the master to - * discard needed xmin or catalog_xmin from any slots that may exist - * on this replica. + * discard needed xmin or catalog_xmin from any slots that may exist on + * this replica. */ if (!HotStandbyActive()) return; @@ -1232,7 +1235,7 @@ XLogWalRcvSendHSFeedback(bool immed) * excludes the catalog_xmin. */ xmin = GetOldestXmin(NULL, - PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN); + PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN); ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); @@ -1253,9 +1256,9 @@ XLogWalRcvSendHSFeedback(bool immed) GetNextXidAndEpoch(&nextXid, &xmin_epoch); catalog_xmin_epoch = xmin_epoch; if (nextXid < xmin) - xmin_epoch --; + xmin_epoch--; if (nextXid < catalog_xmin) - catalog_xmin_epoch --; + catalog_xmin_epoch--; elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a899841d83..49cce38880 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -197,7 +197,7 @@ static XLogRecPtr logical_startptr = InvalidXLogRecPtr; /* A sample associating a WAL location with the time it was written. */ typedef struct { - XLogRecPtr lsn; + XLogRecPtr lsn; TimestampTz time; } WalTimeSample; @@ -207,12 +207,12 @@ typedef struct /* A mechanism for tracking replication lag. */ static struct { - XLogRecPtr last_lsn; + XLogRecPtr last_lsn; WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]; - int write_head; - int read_heads[NUM_SYNC_REP_WAIT_MODE]; + int write_head; + int read_heads[NUM_SYNC_REP_WAIT_MODE]; WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; -} LagTracker; +} LagTracker; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); @@ -530,7 +530,7 @@ StartReplication(StartReplicationCmd *cmd) if (ThisTimeLineID == 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); + errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); /* * We assume here that we're logging enough information in the WAL for @@ -580,8 +580,8 @@ StartReplication(StartReplicationCmd *cmd) sendTimeLineIsHistoric = true; /* - * Check that the timeline the client requested exists, and - * the requested start location is on that timeline. + * Check that the timeline the client requested exists, and the + * requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, @@ -599,8 +599,8 @@ StartReplication(StartReplicationCmd *cmd) * request to start replication from the beginning of the WAL * segment that contains switchpoint, but on the new timeline, so * that it doesn't end up with a partial segment. If you ask for - * too old a starting point, you'll get an error later when we fail - * to find the requested WAL segment in pg_wal. + * too old a starting point, you'll get an error later when we + * fail to find the requested WAL segment in pg_wal. * * XXX: we could be more strict here and only allow a startpoint * that's older than the switchpoint, if it's still in the same @@ -717,9 +717,9 @@ StartReplication(StartReplicationCmd *cmd) MemSet(nulls, false, sizeof(nulls)); /* - * Need a tuple descriptor representing two columns. - * int8 may seem like a surprising data type for this, but in theory - * int4 would not be wide enough for this, as TimeLineID is unsigned. + * Need a tuple descriptor representing two columns. int8 may seem + * like a surprising data type for this, but in theory int4 would not + * be wide enough for this, as TimeLineID is unsigned. */ tupdesc = CreateTemplateTupleDesc(2, false); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", @@ -795,7 +795,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool reserve_wal_given = false; /* Parse options */ - foreach (lc, cmd->options) + foreach(lc, cmd->options) { DefElem *defel = (DefElem *) lfirst(lc); @@ -883,7 +883,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_LOGICAL) { LogicalDecodingContext *ctx; - bool need_full_snapshot = false; + bool need_full_snapshot = false; /* * Do options check early so that we can bail before calling the @@ -1255,10 +1255,10 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId TimestampTz now = GetCurrentTimestamp(); /* - * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS - * to avoid flooding the lag tracker when we commit frequently. + * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to + * avoid flooding the lag tracker when we commit frequently. */ -#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 +#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 if (!TimestampDifferenceExceeds(sendTime, now, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) return; @@ -1474,8 +1474,8 @@ exec_replication_command(const char *cmd_string) SnapBuildClearExportedSnapshot(); /* - * For aborted transactions, don't allow anything except pure SQL, - * the exec_simple_query() will handle it correctly. + * For aborted transactions, don't allow anything except pure SQL, the + * exec_simple_query() will handle it correctly. */ if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) ereport(ERROR, @@ -1744,7 +1744,7 @@ ProcessStandbyReplyMessage(void) bool clearLagTimes; TimestampTz now; - static bool fullyAppliedLastTime = false; + static bool fullyAppliedLastTime = false; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); @@ -1892,7 +1892,7 @@ TransactionIdInRecentPast(TransactionId xid, uint32 epoch) } if (!TransactionIdPrecedesOrEquals(xid, nextXid)) - return false; /* epoch OK, but it's wrapped around */ + return false; /* epoch OK, but it's wrapped around */ return true; } @@ -1974,8 +1974,8 @@ ProcessStandbyHSFeedbackMessage(void) * * If we're using a replication slot we reserve the xmin via that, * otherwise via the walsender's PGXACT entry. We can only track the - * catalog xmin separately when using a slot, so we store the least - * of the two provided when not using a slot. + * catalog xmin separately when using a slot, so we store the least of the + * two provided when not using a slot. * * XXX: It might make sense to generalize the ephemeral slot concept and * always use the slot mechanism to handle the feedback xmin. @@ -2155,8 +2155,8 @@ WalSndLoop(WalSndSendDataCallback send_data) } /* - * At the reception of SIGUSR2, switch the WAL sender to the stopping - * state. + * At the reception of SIGUSR2, switch the WAL sender to the + * stopping state. */ if (got_SIGUSR2) WalSndSetState(WALSNDSTATE_STOPPING); @@ -2588,18 +2588,18 @@ XLogSendPhysical(void) * it seems good enough to capture the time here. We should reach this * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that * may take some time, we read the WAL flush pointer and take the time - * very close to together here so that we'll get a later position if it - * is still moving. + * very close to together here so that we'll get a later position if it is + * still moving. * * Because LagTrackerWriter ignores samples when the LSN hasn't advanced, * this gives us a cheap approximation for the WAL flush time for this * LSN. * * Note that the LSN is not necessarily the LSN for the data contained in - * the present message; it's the end of the WAL, which might be - * further ahead. All the lag tracking machinery cares about is finding - * out when that arbitrary LSN is eventually reported as written, flushed - * and applied, so that it can measure the elapsed time. + * the present message; it's the end of the WAL, which might be further + * ahead. All the lag tracking machinery cares about is finding out when + * that arbitrary LSN is eventually reported as written, flushed and + * applied, so that it can measure the elapsed time. */ LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); @@ -2758,8 +2758,8 @@ XLogSendLogical(void) if (record != NULL) { /* - * Note the lack of any call to LagTrackerWrite() which is handled - * by WalSndUpdateProgress which is called by output plugin through + * Note the lack of any call to LagTrackerWrite() which is handled by + * WalSndUpdateProgress which is called by output plugin through * logical decoding write api. */ LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); @@ -2805,9 +2805,8 @@ WalSndDone(WalSndSendDataCallback send_data) /* * To figure out whether all WAL has successfully been replicated, check - * flush location if valid, write otherwise. Tools like pg_receivewal - * will usually (unless in synchronous mode) return an invalid flush - * location. + * flush location if valid, write otherwise. Tools like pg_receivewal will + * usually (unless in synchronous mode) return an invalid flush location. */ replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? MyWalSnd->write : MyWalSnd->flush; @@ -3077,7 +3076,7 @@ WalSndWaitStopping(void) if (all_stopped) return; - pg_usleep(10000L); /* wait for 10 msec */ + pg_usleep(10000L); /* wait for 10 msec */ } } @@ -3123,7 +3122,7 @@ WalSndGetStateString(WalSndState state) static Interval * offset_to_interval(TimeOffset offset) { - Interval *result = palloc(sizeof(Interval)); + Interval *result = palloc(sizeof(Interval)); result->month = 0; result->day = 0; @@ -3360,9 +3359,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now) static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) { - bool buffer_full; - int new_write_head; - int i; + bool buffer_full; + int new_write_head; + int i; if (!am_walsender) return; @@ -3448,16 +3447,16 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) /* * We didn't cross a time. If there is a future sample that we * haven't reached yet, and we've already reached at least one sample, - * let's interpolate the local flushed time. This is mainly useful for - * reporting a completely stuck apply position as having increasing - * lag, since otherwise we'd have to wait for it to eventually start - * moving again and cross one of our samples before we can show the - * lag increasing. + * let's interpolate the local flushed time. This is mainly useful + * for reporting a completely stuck apply position as having + * increasing lag, since otherwise we'd have to wait for it to + * eventually start moving again and cross one of our samples before + * we can show the lag increasing. */ if (LagTracker.read_heads[head] != LagTracker.write_head && LagTracker.last_read[head].time != 0) { - double fraction; + double fraction; WalTimeSample prev = LagTracker.last_read[head]; WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]]; |