summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c150
1 files changed, 97 insertions, 53 deletions
diff --git a/src/aof.c b/src/aof.c
index dadc34612..9d4587781 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -42,11 +42,13 @@
#include <sys/param.h>
void freeClientArgv(client *c);
-off_t getAppendOnlyFileSize(sds filename);
-off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am);
+off_t getAppendOnlyFileSize(sds filename, int *status);
+off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status);
int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am);
int aofFileExist(char *filename);
int rewriteAppendOnlyFile(char *filename);
+aofManifest *aofLoadManifestFromFile(sds am_filepath);
+void aofManifestFreeAndUpdate(aofManifest *am);
/* ----------------------------------------------------------------------------
* AOF Manifest file implementation.
@@ -226,13 +228,8 @@ sds getAofManifestAsString(aofManifest *am) {
* in order to support seamless upgrades from previous versions which did not
* use them.
*/
-#define MANIFEST_MAX_LINE 1024
void aofLoadManifestFromDisk(void) {
- const char *err = NULL;
- long long maxseq = 0;
-
server.aof_manifest = aofManifestCreate();
-
if (!dirExists(server.aof_dirname)) {
serverLog(LL_NOTICE, "The AOF directory %s doesn't exist", server.aof_dirname);
return;
@@ -247,16 +244,26 @@ void aofLoadManifestFromDisk(void) {
return;
}
+ aofManifest *am = aofLoadManifestFromFile(am_filepath);
+ if (am) aofManifestFreeAndUpdate(am);
+ sdsfree(am_name);
+ sdsfree(am_filepath);
+}
+
+/* Generic manifest loading function, used in `aofLoadManifestFromDisk` and redis-check-aof tool. */
+#define MANIFEST_MAX_LINE 1024
+aofManifest *aofLoadManifestFromFile(sds am_filepath) {
+ const char *err = NULL;
+ long long maxseq = 0;
+
+ aofManifest *am = aofManifestCreate();
FILE *fp = fopen(am_filepath, "r");
if (fp == NULL) {
serverLog(LL_WARNING, "Fatal error: can't open the AOF manifest "
- "file %s for reading: %s", am_name, strerror(errno));
+ "file %s for reading: %s", am_filepath, strerror(errno));
exit(1);
}
- sdsfree(am_name);
- sdsfree(am_filepath);
-
char buf[MANIFEST_MAX_LINE+1];
sds *argv = NULL;
int argc;
@@ -292,14 +299,14 @@ void aofLoadManifestFromDisk(void) {
line = sdstrim(sdsnew(buf), " \t\r\n");
if (!sdslen(line)) {
- err = "The AOF manifest file is invalid format";
+ err = "Invalid AOF manifest file format";
goto loaderr;
}
argv = sdssplitargs(line, &argc);
/* 'argc < 6' was done for forward compatibility. */
if (argv == NULL || argc < 6 || (argc % 2)) {
- err = "The AOF manifest file is invalid format";
+ err = "Invalid AOF manifest file format";
goto loaderr;
}
@@ -321,7 +328,7 @@ void aofLoadManifestFromDisk(void) {
/* We have to make sure we load all the information. */
if (!ai->file_name || !ai->file_seq || !ai->file_type) {
- err = "The AOF manifest file is invalid format";
+ err = "Invalid AOF manifest file format";
goto loaderr;
}
@@ -329,21 +336,21 @@ void aofLoadManifestFromDisk(void) {
argv = NULL;
if (ai->file_type == AOF_FILE_TYPE_BASE) {
- if (server.aof_manifest->base_aof_info) {
+ if (am->base_aof_info) {
err = "Found duplicate base file information";
goto loaderr;
}
- server.aof_manifest->base_aof_info = ai;
- server.aof_manifest->curr_base_file_seq = ai->file_seq;
+ am->base_aof_info = ai;
+ am->curr_base_file_seq = ai->file_seq;
} else if (ai->file_type == AOF_FILE_TYPE_HIST) {
- listAddNodeTail(server.aof_manifest->history_aof_list, ai);
+ listAddNodeTail(am->history_aof_list, ai);
} else if (ai->file_type == AOF_FILE_TYPE_INCR) {
if (ai->file_seq <= maxseq) {
err = "Found a non-monotonic sequence number";
goto loaderr;
}
- listAddNodeTail(server.aof_manifest->incr_aof_list, ai);
- server.aof_manifest->curr_incr_file_seq = ai->file_seq;
+ listAddNodeTail(am->incr_aof_list, ai);
+ am->curr_incr_file_seq = ai->file_seq;
maxseq = ai->file_seq;
} else {
err = "Unknown AOF file type";
@@ -356,7 +363,7 @@ void aofLoadManifestFromDisk(void) {
}
fclose(fp);
- return;
+ return am;
loaderr:
/* Sanitizer suppression: may report a false positive if we goto loaderr
@@ -627,7 +634,7 @@ void aofUpgradePrepare(aofManifest *am) {
server.aof_dirname,
strerror(errno));
sdsfree(aof_filepath);
- exit(1);;
+ exit(1);
}
sdsfree(aof_filepath);
@@ -721,7 +728,7 @@ void aofOpenIfNeededOnServerStart(void) {
exit(1);
}
- server.aof_last_incr_size = getAppendOnlyFileSize(aof_name);
+ server.aof_last_incr_size = getAppendOnlyFileSize(aof_name, NULL);
}
int aofFileExist(char *filename) {
@@ -1338,26 +1345,35 @@ int loadSingleAppendOnlyFile(char *filename) {
client *old_client = server.current_client;
fakeClient = server.current_client = createAOFClient();
- /* Check if this AOF file has an RDB preamble. In that case we need to
- * load the RDB file and later continue loading the AOF tail. */
+ /* Check if the AOF file is in RDB format (it may be RDB encoded base AOF
+ * or old style RDB-preamble AOF). In that case we need to load the RDB file
+ * and later continue loading the AOF tail if it is an old style RDB-preamble AOF. */
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
- /* No RDB preamble, seek back at 0 offset. */
+ /* Not in RDB format, seek back at 0 offset. */
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
- /* RDB preamble. Pass loading the RDB functions. */
+ /* RDB format. Pass loading the RDB functions. */
rio rdb;
+ int old_style = !strcmp(filename, server.aof_filename);
+ if (old_style)
+ serverLog(LL_NOTICE, "Reading RDB preamble from AOF file...");
+ else
+ serverLog(LL_NOTICE, "Reading RDB base file on AOF loading...");
- serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
- serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename);
+ if (old_style)
+ serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename);
+ else
+ serverLog(LL_WARNING, "Error reading the RDB base file %s, AOF loading aborted", filename);
+
goto readerr;
} else {
loadingAbsProgress(ftello(fp));
last_progress_report_size = ftello(fp);
- serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
+ if (old_style) serverLog(LL_NOTICE, "Reading the remaining AOF tail...");
}
}
@@ -1517,15 +1533,15 @@ uxeof: /* Unexpected AOF end of file. */
}
}
}
- serverLog(LL_WARNING,"Unexpected end of file reading the append only file %s. You can: \
- 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. \
- 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.", filename);
+ serverLog(LL_WARNING, "Unexpected end of file reading the append only file %s. You can: "
+ "1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename.manifest>. "
+ "2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.", filename);
ret = AOF_FAILED;
goto cleanup;
fmterr: /* Format error. */
- serverLog(LL_WARNING,"Bad file format reading the append only file %s: \
- make a backup of your AOF file, then use ./redis-check-aof --fix <filename>", filename);
+ serverLog(LL_WARNING, "Bad file format reading the append only file %s: "
+ "make a backup of your AOF file, then use ./redis-check-aof --fix <filename.manifest>", filename);
ret = AOF_FAILED;
/* fall through to cleanup. */
@@ -1540,7 +1556,7 @@ cleanup:
/* Load the AOF files according the aofManifest pointed by am. */
int loadAppendOnlyFiles(aofManifest *am) {
serverAssert(am != NULL);
- int ret = C_OK;
+ int status, ret = C_OK;
long long start;
off_t total_size = 0;
sds aof_name;
@@ -1574,7 +1590,16 @@ int loadAppendOnlyFiles(aofManifest *am) {
/* Here we calculate the total size of all BASE and INCR files in
* advance, it will be set to `server.loading_total_bytes`. */
- total_size = getBaseAndIncrAppendOnlyFilesSize(am);
+ total_size = getBaseAndIncrAppendOnlyFilesSize(am, &status);
+ if (status != AOF_OK) {
+ /* If an AOF exists in the manifest but not on the disk, we consider this to be a fatal error. */
+ if (status == AOF_NOT_EXIST) status = AOF_FAILED;
+
+ return status;
+ } else if (total_size == 0) {
+ return AOF_EMPTY;
+ }
+
startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0);
/* Load BASE AOF if needed. */
@@ -1590,9 +1615,8 @@ int loadAppendOnlyFiles(aofManifest *am) {
aof_name, (float)(ustime()-start)/1000000);
}
- /* If an AOF exists in the manifest but not on the disk, Or the truncated
- * file is not the last file, we consider this to be a fatal error. */
- if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) {
+ /* If the truncated file is not the last file, we consider this to be a fatal error. */
+ if (ret == AOF_TRUNCATED && !last_file) {
ret = AOF_FAILED;
}
@@ -1620,7 +1644,11 @@ int loadAppendOnlyFiles(aofManifest *am) {
aof_name, (float)(ustime()-start)/1000000);
}
- if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) {
+ /* We know that (at least) one of the AOF files has data (total_size > 0),
+ * so empty incr AOF file doesn't count as a AOF_EMPTY result */
+ if (ret == AOF_EMPTY) ret = AOF_OK;
+
+ if (ret == AOF_TRUNCATED && !last_file) {
ret = AOF_FAILED;
}
@@ -1635,7 +1663,7 @@ int loadAppendOnlyFiles(aofManifest *am) {
server.aof_fsync_offset = server.aof_current_size;
cleanup:
- stopLoading(ret == AOF_OK);
+ stopLoading(ret == AOF_OK || ret == AOF_TRUNCATED);
return ret;
}
@@ -2007,10 +2035,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Append XSETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
- if (!rioWriteBulkCount(r,'*',3) ||
+ if (!rioWriteBulkCount(r,'*',7) ||
!rioWriteBulkString(r,"XSETID",6) ||
!rioWriteBulkObject(r,key) ||
- !rioWriteBulkStreamID(r,&s->last_id))
+ !rioWriteBulkStreamID(r,&s->last_id) ||
+ !rioWriteBulkString(r,"ENTRIESADDED",12) ||
+ !rioWriteBulkLongLong(r,s->entries_added) ||
+ !rioWriteBulkString(r,"MAXDELETEDID",12) ||
+ !rioWriteBulkStreamID(r,&s->max_deleted_entry_id))
{
streamIteratorStop(&si);
return 0;
@@ -2025,12 +2057,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
while(raxNext(&ri)) {
streamCG *group = ri.data;
/* Emit the XGROUP CREATE in order to create the group. */
- if (!rioWriteBulkCount(r,'*',5) ||
+ if (!rioWriteBulkCount(r,'*',7) ||
!rioWriteBulkString(r,"XGROUP",6) ||
!rioWriteBulkString(r,"CREATE",6) ||
!rioWriteBulkObject(r,key) ||
!rioWriteBulkString(r,(char*)ri.key,ri.key_len) ||
- !rioWriteBulkStreamID(r,&group->last_id))
+ !rioWriteBulkStreamID(r,&group->last_id) ||
+ !rioWriteBulkString(r,"ENTRIESREAD",11) ||
+ !rioWriteBulkLongLong(r,group->entries_read))
{
raxStop(&ri);
streamIteratorStop(&si);
@@ -2332,7 +2366,7 @@ int rewriteAppendOnlyFileBackground(void) {
server.aof_selected_db = -1;
flushAppendOnlyFile(1);
if (openNewIncrAofForAppend() != C_OK) return C_ERR;
-
+ server.stat_aof_rewrites++;
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
char tmpfile[256];
@@ -2388,7 +2422,10 @@ void aofRemoveTempFile(pid_t childpid) {
bg_unlink(tmpfile);
}
-off_t getAppendOnlyFileSize(sds filename) {
+/* Get size of an AOF file.
+ * The status argument is an optional output argument to be filled with
+ * one of the AOF_ status values. */
+off_t getAppendOnlyFileSize(sds filename, int *status) {
struct redis_stat sb;
off_t size;
mstime_t latency;
@@ -2396,10 +2433,12 @@ off_t getAppendOnlyFileSize(sds filename) {
sds aof_filepath = makePath(server.aof_dirname, filename);
latencyStartMonitor(latency);
if (redis_stat(aof_filepath, &sb) == -1) {
+ if (status) *status = errno == ENOENT ? AOF_NOT_EXIST : AOF_OPEN_ERR;
serverLog(LL_WARNING, "Unable to obtain the AOF file %s length. stat: %s",
filename, strerror(errno));
size = 0;
} else {
+ if (status) *status = AOF_OK;
size = sb.st_size;
}
latencyEndMonitor(latency);
@@ -2408,22 +2447,27 @@ off_t getAppendOnlyFileSize(sds filename) {
return size;
}
-off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am) {
+/* Get size of all AOF files referred by the manifest (excluding history).
+ * The status argument is an output argument to be filled with
+ * one of the AOF_ status values. */
+off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status) {
off_t size = 0;
-
listNode *ln;
listIter li;
if (am->base_aof_info) {
serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
- size += getAppendOnlyFileSize(am->base_aof_info->file_name);
+
+ size += getAppendOnlyFileSize(am->base_aof_info->file_name, status);
+ if (*status != AOF_OK) return 0;
}
listRewind(am->incr_aof_list, &li);
while ((ln = listNext(&li)) != NULL) {
aofInfo *ai = (aofInfo*)ln->value;
serverAssert(ai->file_type == AOF_FILE_TYPE_INCR);
- size += getAppendOnlyFileSize(ai->file_name);
+ size += getAppendOnlyFileSize(ai->file_name, status);
+ if (*status != AOF_OK) return 0;
}
return size;
@@ -2497,7 +2541,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (server.aof_fd != -1) {
/* AOF enabled. */
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
- server.aof_current_size = getAppendOnlyFileSize(new_base_filename) + server.aof_last_incr_size;
+ server.aof_current_size = getAppendOnlyFileSize(new_base_filename, NULL) + server.aof_last_incr_size;
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;