diff options
Diffstat (limited to 'src/aof.c')
-rw-r--r-- | src/aof.c | 150 |
1 files changed, 97 insertions, 53 deletions
@@ -42,11 +42,13 @@ #include <sys/param.h> void freeClientArgv(client *c); -off_t getAppendOnlyFileSize(sds filename); -off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am); +off_t getAppendOnlyFileSize(sds filename, int *status); +off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status); int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am); int aofFileExist(char *filename); int rewriteAppendOnlyFile(char *filename); +aofManifest *aofLoadManifestFromFile(sds am_filepath); +void aofManifestFreeAndUpdate(aofManifest *am); /* ---------------------------------------------------------------------------- * AOF Manifest file implementation. @@ -226,13 +228,8 @@ sds getAofManifestAsString(aofManifest *am) { * in order to support seamless upgrades from previous versions which did not * use them. */ -#define MANIFEST_MAX_LINE 1024 void aofLoadManifestFromDisk(void) { - const char *err = NULL; - long long maxseq = 0; - server.aof_manifest = aofManifestCreate(); - if (!dirExists(server.aof_dirname)) { serverLog(LL_NOTICE, "The AOF directory %s doesn't exist", server.aof_dirname); return; @@ -247,16 +244,26 @@ void aofLoadManifestFromDisk(void) { return; } + aofManifest *am = aofLoadManifestFromFile(am_filepath); + if (am) aofManifestFreeAndUpdate(am); + sdsfree(am_name); + sdsfree(am_filepath); +} + +/* Generic manifest loading function, used in `aofLoadManifestFromDisk` and redis-check-aof tool. */ +#define MANIFEST_MAX_LINE 1024 +aofManifest *aofLoadManifestFromFile(sds am_filepath) { + const char *err = NULL; + long long maxseq = 0; + + aofManifest *am = aofManifestCreate(); FILE *fp = fopen(am_filepath, "r"); if (fp == NULL) { serverLog(LL_WARNING, "Fatal error: can't open the AOF manifest " - "file %s for reading: %s", am_name, strerror(errno)); + "file %s for reading: %s", am_filepath, strerror(errno)); exit(1); } - sdsfree(am_name); - sdsfree(am_filepath); - char buf[MANIFEST_MAX_LINE+1]; sds *argv = NULL; int argc; @@ -292,14 +299,14 @@ void aofLoadManifestFromDisk(void) { line = sdstrim(sdsnew(buf), " \t\r\n"); if (!sdslen(line)) { - err = "The AOF manifest file is invalid format"; + err = "Invalid AOF manifest file format"; goto loaderr; } argv = sdssplitargs(line, &argc); /* 'argc < 6' was done for forward compatibility. */ if (argv == NULL || argc < 6 || (argc % 2)) { - err = "The AOF manifest file is invalid format"; + err = "Invalid AOF manifest file format"; goto loaderr; } @@ -321,7 +328,7 @@ void aofLoadManifestFromDisk(void) { /* We have to make sure we load all the information. */ if (!ai->file_name || !ai->file_seq || !ai->file_type) { - err = "The AOF manifest file is invalid format"; + err = "Invalid AOF manifest file format"; goto loaderr; } @@ -329,21 +336,21 @@ void aofLoadManifestFromDisk(void) { argv = NULL; if (ai->file_type == AOF_FILE_TYPE_BASE) { - if (server.aof_manifest->base_aof_info) { + if (am->base_aof_info) { err = "Found duplicate base file information"; goto loaderr; } - server.aof_manifest->base_aof_info = ai; - server.aof_manifest->curr_base_file_seq = ai->file_seq; + am->base_aof_info = ai; + am->curr_base_file_seq = ai->file_seq; } else if (ai->file_type == AOF_FILE_TYPE_HIST) { - listAddNodeTail(server.aof_manifest->history_aof_list, ai); + listAddNodeTail(am->history_aof_list, ai); } else if (ai->file_type == AOF_FILE_TYPE_INCR) { if (ai->file_seq <= maxseq) { err = "Found a non-monotonic sequence number"; goto loaderr; } - listAddNodeTail(server.aof_manifest->incr_aof_list, ai); - server.aof_manifest->curr_incr_file_seq = ai->file_seq; + listAddNodeTail(am->incr_aof_list, ai); + am->curr_incr_file_seq = ai->file_seq; maxseq = ai->file_seq; } else { err = "Unknown AOF file type"; @@ -356,7 +363,7 @@ void aofLoadManifestFromDisk(void) { } fclose(fp); - return; + return am; loaderr: /* Sanitizer suppression: may report a false positive if we goto loaderr @@ -627,7 +634,7 @@ void aofUpgradePrepare(aofManifest *am) { server.aof_dirname, strerror(errno)); sdsfree(aof_filepath); - exit(1);; + exit(1); } sdsfree(aof_filepath); @@ -721,7 +728,7 @@ void aofOpenIfNeededOnServerStart(void) { exit(1); } - server.aof_last_incr_size = getAppendOnlyFileSize(aof_name); + server.aof_last_incr_size = getAppendOnlyFileSize(aof_name, NULL); } int aofFileExist(char *filename) { @@ -1338,26 +1345,35 @@ int loadSingleAppendOnlyFile(char *filename) { client *old_client = server.current_client; fakeClient = server.current_client = createAOFClient(); - /* Check if this AOF file has an RDB preamble. In that case we need to - * load the RDB file and later continue loading the AOF tail. */ + /* Check if the AOF file is in RDB format (it may be RDB encoded base AOF + * or old style RDB-preamble AOF). In that case we need to load the RDB file + * and later continue loading the AOF tail if it is an old style RDB-preamble AOF. */ char sig[5]; /* "REDIS" */ if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) { - /* No RDB preamble, seek back at 0 offset. */ + /* Not in RDB format, seek back at 0 offset. */ if (fseek(fp,0,SEEK_SET) == -1) goto readerr; } else { - /* RDB preamble. Pass loading the RDB functions. */ + /* RDB format. Pass loading the RDB functions. */ rio rdb; + int old_style = !strcmp(filename, server.aof_filename); + if (old_style) + serverLog(LL_NOTICE, "Reading RDB preamble from AOF file..."); + else + serverLog(LL_NOTICE, "Reading RDB base file on AOF loading..."); - serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { - serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename); + if (old_style) + serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename); + else + serverLog(LL_WARNING, "Error reading the RDB base file %s, AOF loading aborted", filename); + goto readerr; } else { loadingAbsProgress(ftello(fp)); last_progress_report_size = ftello(fp); - serverLog(LL_NOTICE,"Reading the remaining AOF tail..."); + if (old_style) serverLog(LL_NOTICE, "Reading the remaining AOF tail..."); } } @@ -1517,15 +1533,15 @@ uxeof: /* Unexpected AOF end of file. */ } } } - serverLog(LL_WARNING,"Unexpected end of file reading the append only file %s. You can: \ - 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. \ - 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.", filename); + serverLog(LL_WARNING, "Unexpected end of file reading the append only file %s. You can: " + "1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename.manifest>. " + "2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.", filename); ret = AOF_FAILED; goto cleanup; fmterr: /* Format error. */ - serverLog(LL_WARNING,"Bad file format reading the append only file %s: \ - make a backup of your AOF file, then use ./redis-check-aof --fix <filename>", filename); + serverLog(LL_WARNING, "Bad file format reading the append only file %s: " + "make a backup of your AOF file, then use ./redis-check-aof --fix <filename.manifest>", filename); ret = AOF_FAILED; /* fall through to cleanup. */ @@ -1540,7 +1556,7 @@ cleanup: /* Load the AOF files according the aofManifest pointed by am. */ int loadAppendOnlyFiles(aofManifest *am) { serverAssert(am != NULL); - int ret = C_OK; + int status, ret = C_OK; long long start; off_t total_size = 0; sds aof_name; @@ -1574,7 +1590,16 @@ int loadAppendOnlyFiles(aofManifest *am) { /* Here we calculate the total size of all BASE and INCR files in * advance, it will be set to `server.loading_total_bytes`. */ - total_size = getBaseAndIncrAppendOnlyFilesSize(am); + total_size = getBaseAndIncrAppendOnlyFilesSize(am, &status); + if (status != AOF_OK) { + /* If an AOF exists in the manifest but not on the disk, we consider this to be a fatal error. */ + if (status == AOF_NOT_EXIST) status = AOF_FAILED; + + return status; + } else if (total_size == 0) { + return AOF_EMPTY; + } + startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0); /* Load BASE AOF if needed. */ @@ -1590,9 +1615,8 @@ int loadAppendOnlyFiles(aofManifest *am) { aof_name, (float)(ustime()-start)/1000000); } - /* If an AOF exists in the manifest but not on the disk, Or the truncated - * file is not the last file, we consider this to be a fatal error. */ - if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) { + /* If the truncated file is not the last file, we consider this to be a fatal error. */ + if (ret == AOF_TRUNCATED && !last_file) { ret = AOF_FAILED; } @@ -1620,7 +1644,11 @@ int loadAppendOnlyFiles(aofManifest *am) { aof_name, (float)(ustime()-start)/1000000); } - if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) { + /* We know that (at least) one of the AOF files has data (total_size > 0), + * so empty incr AOF file doesn't count as a AOF_EMPTY result */ + if (ret == AOF_EMPTY) ret = AOF_OK; + + if (ret == AOF_TRUNCATED && !last_file) { ret = AOF_FAILED; } @@ -1635,7 +1663,7 @@ int loadAppendOnlyFiles(aofManifest *am) { server.aof_fsync_offset = server.aof_current_size; cleanup: - stopLoading(ret == AOF_OK); + stopLoading(ret == AOF_OK || ret == AOF_TRUNCATED); return ret; } @@ -2007,10 +2035,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Append XSETID after XADD, make sure lastid is correct, * in case of XDEL lastid. */ - if (!rioWriteBulkCount(r,'*',3) || + if (!rioWriteBulkCount(r,'*',7) || !rioWriteBulkString(r,"XSETID",6) || !rioWriteBulkObject(r,key) || - !rioWriteBulkStreamID(r,&s->last_id)) + !rioWriteBulkStreamID(r,&s->last_id) || + !rioWriteBulkString(r,"ENTRIESADDED",12) || + !rioWriteBulkLongLong(r,s->entries_added) || + !rioWriteBulkString(r,"MAXDELETEDID",12) || + !rioWriteBulkStreamID(r,&s->max_deleted_entry_id)) { streamIteratorStop(&si); return 0; @@ -2025,12 +2057,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { while(raxNext(&ri)) { streamCG *group = ri.data; /* Emit the XGROUP CREATE in order to create the group. */ - if (!rioWriteBulkCount(r,'*',5) || + if (!rioWriteBulkCount(r,'*',7) || !rioWriteBulkString(r,"XGROUP",6) || !rioWriteBulkString(r,"CREATE",6) || !rioWriteBulkObject(r,key) || !rioWriteBulkString(r,(char*)ri.key,ri.key_len) || - !rioWriteBulkStreamID(r,&group->last_id)) + !rioWriteBulkStreamID(r,&group->last_id) || + !rioWriteBulkString(r,"ENTRIESREAD",11) || + !rioWriteBulkLongLong(r,group->entries_read)) { raxStop(&ri); streamIteratorStop(&si); @@ -2332,7 +2366,7 @@ int rewriteAppendOnlyFileBackground(void) { server.aof_selected_db = -1; flushAppendOnlyFile(1); if (openNewIncrAofForAppend() != C_OK) return C_ERR; - + server.stat_aof_rewrites++; if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) { char tmpfile[256]; @@ -2388,7 +2422,10 @@ void aofRemoveTempFile(pid_t childpid) { bg_unlink(tmpfile); } -off_t getAppendOnlyFileSize(sds filename) { +/* Get size of an AOF file. + * The status argument is an optional output argument to be filled with + * one of the AOF_ status values. */ +off_t getAppendOnlyFileSize(sds filename, int *status) { struct redis_stat sb; off_t size; mstime_t latency; @@ -2396,10 +2433,12 @@ off_t getAppendOnlyFileSize(sds filename) { sds aof_filepath = makePath(server.aof_dirname, filename); latencyStartMonitor(latency); if (redis_stat(aof_filepath, &sb) == -1) { + if (status) *status = errno == ENOENT ? AOF_NOT_EXIST : AOF_OPEN_ERR; serverLog(LL_WARNING, "Unable to obtain the AOF file %s length. stat: %s", filename, strerror(errno)); size = 0; } else { + if (status) *status = AOF_OK; size = sb.st_size; } latencyEndMonitor(latency); @@ -2408,22 +2447,27 @@ off_t getAppendOnlyFileSize(sds filename) { return size; } -off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am) { +/* Get size of all AOF files referred by the manifest (excluding history). + * The status argument is an output argument to be filled with + * one of the AOF_ status values. */ +off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am, int *status) { off_t size = 0; - listNode *ln; listIter li; if (am->base_aof_info) { serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE); - size += getAppendOnlyFileSize(am->base_aof_info->file_name); + + size += getAppendOnlyFileSize(am->base_aof_info->file_name, status); + if (*status != AOF_OK) return 0; } listRewind(am->incr_aof_list, &li); while ((ln = listNext(&li)) != NULL) { aofInfo *ai = (aofInfo*)ln->value; serverAssert(ai->file_type == AOF_FILE_TYPE_INCR); - size += getAppendOnlyFileSize(ai->file_name); + size += getAppendOnlyFileSize(ai->file_name, status); + if (*status != AOF_OK) return 0; } return size; @@ -2497,7 +2541,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (server.aof_fd != -1) { /* AOF enabled. */ server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ - server.aof_current_size = getAppendOnlyFileSize(new_base_filename) + server.aof_last_incr_size; + server.aof_current_size = getAppendOnlyFileSize(new_base_filename, NULL) + server.aof_last_incr_size; server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; server.aof_last_fsync = server.unixtime; |