summaryrefslogtreecommitdiff
path: root/src/rdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rdb.c')
-rw-r--r--src/rdb.c79
1 files changed, 71 insertions, 8 deletions
diff --git a/src/rdb.c b/src/rdb.c
index ac5aa1f86..d5f853dd8 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -692,7 +692,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
- return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
+ return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
@@ -986,6 +986,19 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;
+ /* Save the first entry ID. */
+ if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1;
+ nwritten += n;
+ /* Save the maximal tombstone ID. */
+ if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1;
+ nwritten += n;
+ /* Save the offset. */
+ if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1;
+ nwritten += n;
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */
@@ -1020,6 +1033,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
return -1;
}
nwritten += n;
+
+ /* Save the group's logical reads counter. */
+ if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) {
+ raxStop(&ri);
+ return -1;
+ }
+ nwritten += n;
/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
@@ -1151,8 +1171,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
+ UNUSED(rdbflags);
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
- int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
+ int aof_base = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@@ -1169,7 +1190,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
== -1) return -1;
}
- if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
+ if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1;
return 1;
}
@@ -1470,6 +1491,7 @@ int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;
+ server.stat_rdb_saves++;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
@@ -2319,7 +2341,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
- } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
+ } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);
@@ -2395,6 +2417,30 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
/* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);
+
+ if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
+ /* Load the first entry ID. */
+ s->first_id.ms = rdbLoadLen(rdb,NULL);
+ s->first_id.seq = rdbLoadLen(rdb,NULL);
+
+ /* Load the maximal deleted entry ID. */
+ s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL);
+ s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL);
+
+ /* Load the offset. */
+ s->entries_added = rdbLoadLen(rdb,NULL);
+ } else {
+ /* During migration the offset can be initialized to the stream's
+ * length. At this point, we also don't care about tombstones
+ * because CG offsets will be later initialized as well. */
+ s->max_deleted_entry_id.ms = 0;
+ s->max_deleted_entry_id.seq = 0;
+ s->entries_added = s->length;
+
+ /* Since the rax is already loaded, we can find the first entry's
+ * ID. */
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream object metadata loading failed.");
@@ -2430,8 +2476,22 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
decrRefCount(o);
return NULL;
}
+
+ /* Load group offset. */
+ uint64_t cg_offset;
+ if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
+ cg_offset = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream cgroup offset loading failed.");
+ sdsfree(cgname);
+ decrRefCount(o);
+ return NULL;
+ }
+ } else {
+ cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id);
+ }
- streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
+ streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset);
if (cgroup == NULL) {
rdbReportCorruptRDB("Duplicated consumer group name %s",
cgname);
@@ -2962,6 +3022,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
} else if (!strcasecmp(auxkey->ptr,"aof-preamble")) {
long long haspreamble = strtoll(auxval->ptr,NULL,10);
if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail");
+ } else if (!strcasecmp(auxkey->ptr, "aof-base")) {
+ long long isbase = strtoll(auxval->ptr, NULL, 10);
+ if (isbase) serverLog(LL_NOTICE, "RDB is base AOF");
} else if (!strcasecmp(auxkey->ptr,"redis-bits")) {
/* Just ignored. */
} else {
@@ -3049,9 +3112,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave.
- * Similarly if the RDB is the preamble of an AOF file, we want to
- * load all the keys as they are, since the log of operations later
- * assume to work in an exact keyspace state. */
+ * Similarly, if the base AOF is RDB format, we want to load all
+ * the keys they are, since the log of operations in the incr AOF
+ * is assumed to work in the exact keyspace state. */
if (val == NULL) {
/* Since we used to have bug that could lead to empty keys
* (See #8453), we rather not fail when empty key is encountered