diff options
Diffstat (limited to 'src/rdb.c')
-rw-r--r-- | src/rdb.c | 79 |
1 files changed, 71 insertions, 8 deletions
@@ -692,7 +692,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) { else serverPanic("Unknown hash encoding"); case OBJ_STREAM: - return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); + return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2); case OBJ_MODULE: return rdbSaveType(rdb,RDB_TYPE_MODULE_2); default: @@ -986,6 +986,19 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; nwritten += n; + /* Save the first entry ID. */ + if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1; + nwritten += n; + /* Save the maximal tombstone ID. */ + if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1; + nwritten += n; + /* Save the offset. */ + if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1; + nwritten += n; /* The consumer groups and their clients are part of the stream * type, so serialize every consumer group. */ @@ -1020,6 +1033,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { return -1; } nwritten += n; + + /* Save the group's logical reads counter. */ + if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) { + raxStop(&ri); + return -1; + } + nwritten += n; /* Save the global PEL. */ if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) { @@ -1151,8 +1171,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { /* Save a few default AUX fields with information about the RDB generated. */ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { + UNUSED(rdbflags); int redis_bits = (sizeof(void*) == 8) ? 64 : 32; - int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; + int aof_base = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; @@ -1169,7 +1190,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1) return -1; } - if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; + if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1; return 1; } @@ -1470,6 +1491,7 @@ int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) { pid_t childpid; if (hasActiveChildProcess()) return C_ERR; + server.stat_rdb_saves++; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); @@ -2319,7 +2341,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } - } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) { + } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); @@ -2395,6 +2417,30 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { /* Load the last entry ID. */ s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL); + + if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + /* Load the first entry ID. */ + s->first_id.ms = rdbLoadLen(rdb,NULL); + s->first_id.seq = rdbLoadLen(rdb,NULL); + + /* Load the maximal deleted entry ID. */ + s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL); + s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL); + + /* Load the offset. */ + s->entries_added = rdbLoadLen(rdb,NULL); + } else { + /* During migration the offset can be initialized to the stream's + * length. At this point, we also don't care about tombstones + * because CG offsets will be later initialized as well. */ + s->max_deleted_entry_id.ms = 0; + s->max_deleted_entry_id.seq = 0; + s->entries_added = s->length; + + /* Since the rax is already loaded, we can find the first entry's + * ID. */ + streamGetEdgeID(s,1,1,&s->first_id); + } if (rioGetReadError(rdb)) { rdbReportReadError("Stream object metadata loading failed."); @@ -2430,8 +2476,22 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { decrRefCount(o); return NULL; } + + /* Load group offset. */ + uint64_t cg_offset; + if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + cg_offset = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream cgroup offset loading failed."); + sdsfree(cgname); + decrRefCount(o); + return NULL; + } + } else { + cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id); + } - streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); + streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset); if (cgroup == NULL) { rdbReportCorruptRDB("Duplicated consumer group name %s", cgname); @@ -2962,6 +3022,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } else if (!strcasecmp(auxkey->ptr,"aof-preamble")) { long long haspreamble = strtoll(auxval->ptr,NULL,10); if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail"); + } else if (!strcasecmp(auxkey->ptr, "aof-base")) { + long long isbase = strtoll(auxval->ptr, NULL, 10); + if (isbase) serverLog(LL_NOTICE, "RDB is base AOF"); } else if (!strcasecmp(auxkey->ptr,"redis-bits")) { /* Just ignored. */ } else { @@ -3049,9 +3112,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. - * Similarly if the RDB is the preamble of an AOF file, we want to - * load all the keys as they are, since the log of operations later - * assume to work in an exact keyspace state. */ + * Similarly, if the base AOF is RDB format, we want to load all + * the keys they are, since the log of operations in the incr AOF + * is assumed to work in the exact keyspace state. */ if (val == NULL) { /* Since we used to have bug that could lead to empty keys * (See #8453), we rather not fail when empty key is encountered |