diff options
Diffstat (limited to 'src/key-value-store/database/kissdb.c')
-rw-r--r-- | src/key-value-store/database/kissdb.c | 2773 |
1 files changed, 1760 insertions, 1013 deletions
diff --git a/src/key-value-store/database/kissdb.c b/src/key-value-store/database/kissdb.c index 4c8e7b6..6a4d119 100644 --- a/src/key-value-store/database/kissdb.c +++ b/src/key-value-store/database/kissdb.c @@ -1,8 +1,8 @@ - /****************************************************************************** - * Project Persistency - * (c) copyright 2014 - * Company XS Embedded GmbH - *****************************************************************************/ +/****************************************************************************** +* Project Persistency +* (c) copyright 2014 +* Company XS Embedded GmbH +*****************************************************************************/ /* (Keep It) Simple Stupid Database * * Written by Adam Ierymenko <adam.ierymenko@zerotier.com> @@ -17,14 +17,8 @@ /* Note: big-endian systems will need changes to implement byte swapping * on hash table file I/O. Or you could just use it as-is if you don't care * that your database files will be unreadable on little-endian systems. */ - #define _FILE_OFFSET_BITS 64 -#define TMP_BUFFER_LENGTH 128 #define KISSDB_HEADER_SIZE sizeof(Header_s) -#define __useBackups -//#define __useFileMapping -//#define __writeThrough -#define __checkerror #include "./kissdb.h" #include "../crc32.h" @@ -37,11 +31,20 @@ #include <sys/mman.h> #include <unistd.h> #include <ctype.h> +#include <sys/stat.h> #include <sys/time.h> +#include <semaphore.h> +#include <dlt.h> +#include "persComErrors.h" -#include "dlt.h" +// +//#ifdef COND_GCOV +//extern void __gcov_flush(void); +//#endif -DLT_DECLARE_CONTEXT(persComLldbDLTCtx); +//#define PFS_TEST +//extern DltContext persComLldbDLTCtx; +DLT_IMPORT_CONTEXT (persComLldbDLTCtx) #ifdef __showTimeMeasurements inline long long getNsDuration(struct timespec* start, struct timespec* end) @@ -51,43 +54,59 @@ inline long long getNsDuration(struct timespec* start, struct timespec* end) #endif /* djb2 hash function */ -static uint64_t KISSDB_hash(const void *b, unsigned long len) +static uint64_t KISSDB_hash(const void* b, unsigned long len) { unsigned long i; uint64_t hash = 5381; for (i = 0; i < len; ++i) - hash = ((hash << 5) + hash) + (uint64_t) (((const uint8_t *) b)[i]); + { + hash = ((hash << 5) + hash) + (uint64_t) (((const uint8_t*) b)[i]); + } return hash; } +#if 1 //returns a name for shared memory objects beginning with a slash followed by "path" (non alphanumeric chars are replaced with '_') appended with "tailing" -char * kdbGetShmName(const char *tailing, const char * path) +char* kdbGetShmName(const char* tailing, const char* path) { - char * result = (char *) malloc(1 + strlen(path) + strlen(tailing) + 1); //free happens at lifecycle shutdown + int pathLen = strlen(path); + int tailLen = strlen(tailing); + char* result = (char*) malloc(1 + pathLen + tailLen + 1); //free happens at lifecycle shutdown int i =0; int x = 1; if (result != NULL) { result[0] = '/'; - for (i = 0; i < strlen(path); i++) + for (i = 0; i < pathLen; i++) { if (!isalnum(path[i])) + { result[i + 1] = '_'; + } else + { result[i + 1] = path[i]; + } } - for (x = 0; x < strlen(tailing); x++) + for (x = 0; x < tailLen; x++) { result[i + x + 1] = tailing[x]; } result[i + x + 1] = '\0'; } + else + { + result = NULL; + } return result; } +#endif + + //returns -1 on error and positive value for success -int kdbShmemOpen(const char * name, size_t length, Kdb_bool* shmCreator) +int kdbShmemOpen(const char* name, size_t length, Kdb_bool* shmCreator) { int result; result = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); @@ -98,111 +117,268 @@ int kdbShmemOpen(const char * name, size_t length, Kdb_bool* shmCreator) *shmCreator = Kdb_false; result = shm_open(name, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); if (result < 0) + { return -1; + } } } else { *shmCreator = Kdb_true; if (ftruncate(result, length) < 0) + { return -1; + } } return result; } -void Kdb_wrlock(pthread_rwlock_t * wrlock) +void Kdb_wrlock(pthread_rwlock_t* wrlock) { pthread_rwlock_wrlock(wrlock); } -void Kdb_rdlock(pthread_rwlock_t * rdlock) -{ - pthread_rwlock_rdlock(rdlock); -} +//void Kdb_rdlock(pthread_rwlock_t* rdlock) +//{ +// pthread_rwlock_rdlock(rdlock); +//} -void Kdb_unlock(pthread_rwlock_t * lock) +void Kdb_unlock(pthread_rwlock_t* lock) { pthread_rwlock_unlock(lock); } -Kdb_bool kdbShmemClose(int shmem, const char * shmName) +Kdb_bool kdbShmemClose(int shmem, const char* shmName) { if( close(shmem) == -1) + { return Kdb_false; + } if( shm_unlink(shmName) < 0) + { return Kdb_false; + } return Kdb_true; } -void * getKdbShmemPtr(int shmem, size_t length) +void* getKdbShmemPtr(int shmem, size_t length) { void* result = mmap(NULL, length, PROT_READ | PROT_WRITE, MAP_SHARED, shmem, 0); if (result == MAP_FAILED) - return ((void *) -1); + { + return ((void*) -1); + } return result; } -Kdb_bool freeKdbShmemPtr(void * shmem_ptr, size_t length) + +Kdb_bool freeKdbShmemPtr(void* shmem_ptr, size_t length) { if(munmap(shmem_ptr, length) == 0) + { return Kdb_true; + } else + { return Kdb_false; + } } -Kdb_bool resizeKdbShmem(int shmem, Hashtable_slot_s** shmem_ptr, size_t oldLength, size_t newLength) +Kdb_bool resizeKdbShmem(int shmem, Hashtable_s** shmem_ptr, size_t oldLength, size_t newLength) { //unmap shm with old size if( freeKdbShmemPtr(*shmem_ptr, oldLength) == Kdb_false) + { return Kdb_false; + } if (ftruncate(shmem, newLength) < 0) + { return Kdb_false; + } //get pointer to resized shm with new Length *shmem_ptr = getKdbShmemPtr(shmem, newLength); - if(*shmem_ptr == ((void *) -1)) + if(*shmem_ptr == ((void*) -1)) + { return Kdb_false; + } return Kdb_true; } -#ifdef __writeThrough -Kdb_bool remapKdbShmem(int shmem, uint64_t** shmem_ptr, size_t oldLength, size_t newLength) + +Kdb_bool remapSharedHashtable(int shmem, Hashtable_s** shmem_ptr, size_t oldLength, size_t newLength ) { - //unmap shm with old size - if( freeKdbShmemPtr(*shmem_ptr, oldLength) == Kdb_false ) + //unmap hashtable with old size + if( freeKdbShmemPtr(*shmem_ptr, oldLength) == Kdb_false) + { return Kdb_false; + } //get pointer to resized shm with new Length *shmem_ptr = getKdbShmemPtr(shmem, newLength); - if(*shmem_ptr == ((void *) -1)) + if(*shmem_ptr == ((void*) -1)) + { return Kdb_false; + } return Kdb_true; } + +#if 0 +void printKdb(KISSDB* db) +{ + printf("START ############################### \n"); + printf("db->htSize: %d \n", db->htSize); + printf("db->cacheReferenced: %d \n", db->cacheReferenced); + printf("db->keySize: %" PRId64 " \n", db->keySize); + printf("db->valSize: %" PRId64 " \n", db->valSize); + printf("db->htSizeBytes: %" PRId64 " \n", db->htSizeBytes); + printf("db->htMappedSize: %" PRId64 " \n", db->htMappedSize); + printf("db->dbMappedSize: %" PRId64 " \n", db->dbMappedSize); + printf("db->shmCreator: %d \n", db->shmCreator); + printf("db->alreadyOpen: %d \n", db->alreadyOpen); + printf("db->hashTables: %p \n", db->hashTables); + printf("db->mappedDb: %p \n", db->mappedDb); + printf("db->sharedCache: %p \n", db->sharedCache); + printf("db->sharedFd: %d \n", db->sharedFd); + printf("db->htFd: %d \n", db->htFd); + printf("db->sharedCacheFd: %d \n", db->sharedCacheFd); + printf("db->semName: %s \n", db->semName); + printf("db->sharedName: %s \n", db->sharedName); + printf("db->cacheName: %s \n", db->cacheName); + printf("db->htName: %s \n", db->htName); + printf("db->shared: %p \n", db->shared); + printf("db->tbl: %p \n", db->tbl[0]); + printf("db->kdbSem: %p \n", db->kdbSem); + printf("db->fd: %d \n", db->fd); + printf("END ############################### \n"); +} #endif -int KISSDB_open(KISSDB *db, const char *path, int mode, uint16_t hash_table_size, uint64_t key_size, - uint64_t value_size) +int KISSDB_open(KISSDB* db, const char* path, int openMode, int writeMode, uint16_t hash_table_size, uint64_t key_size, uint64_t value_size) { - Hashtable_slot_s *httmp; - Kdb_bool tmp_creator; + Hashtable_s* htptr; int ret = 0; + Kdb_bool htFound; + Kdb_bool tmpCreator; + off_t offset = 0; + size_t firstMappSize; + struct stat sb; - //TODO check if usage of O_SYNC O_DIRECT flags is needed. If O_SYNC and O_DIrect is specified, no additional fsync calls are needed after fflush - if(mode == KISSDB_OPEN_MODE_RWCREAT) - db->fd = open(path, O_CREAT | O_RDWR , S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH ); //gets closed when db->f is closed + if (db->alreadyOpen == Kdb_false) //check if this instance has already opened the db before + { + db->sharedName = kdbGetShmName("-shm-info", path); + if (db->sharedName == NULL) + { + return KISSDB_ERROR_MALLOC; + } + db->sharedFd = kdbShmemOpen(db->sharedName, sizeof(Shared_Data_s), &db->shmCreator); + if (db->sharedFd < 0) + { + return KISSDB_ERROR_OPEN_SHM; + } + db->shared = (Shared_Data_s*) getKdbShmemPtr(db->sharedFd, sizeof(Shared_Data_s)); + if (db->shared == ((void*) -1)) + { + return KISSDB_ERROR_MAP_SHM; + } + + db->sharedCacheFd = -1; + db->mappedDb = NULL; + + if (db->shmCreator == Kdb_true) + { + //[Initialize rwlock attributes] + pthread_rwlockattr_t rwlattr; + pthread_rwlockattr_init(&rwlattr); + pthread_rwlockattr_setpshared(&rwlattr, PTHREAD_PROCESS_SHARED); + pthread_rwlock_init(&db->shared->rwlock, &rwlattr); + + Kdb_wrlock(&db->shared->rwlock); + + //init cache filedescriptor, reference counter and hashtable number + db->sharedCacheFd = -1; + db->shared->refCount = 0; + db->shared->htNum = 0; + db->shared->mappedDbSize = 0; + db->shared->writeMode = writeMode; + db->shared->openMode = openMode; + } + else + { + Kdb_wrlock(&db->shared->rwlock); + } + } else - db->fd = open(path, O_RDWR , S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH ); //gets closed when db->f is closed + { + Kdb_wrlock(&db->shared->rwlock); + } + switch (db->shared->openMode) + { + case KISSDB_OPEN_MODE_RWCREAT: + { + //create database + db->fd = open(path, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + break; + } + case KISSDB_OPEN_MODE_RDWR: + { + //read / write mode + db->fd = open(path, O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + break; + } + case KISSDB_OPEN_MODE_RDONLY: + { + db->fd = open(path, O_RDONLY); + break; + } + default: + { + break; + } + } if(db->fd == -1) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": Opening database file: <"); DLT_STRING(path); DLT_STRING("> failed: "); DLT_STRING(strerror(errno))); return KISSDB_ERROR_IO; + } - if (lseek(db->fd, 0, SEEK_END) == -1) + if( 0 != fstat(db->fd, &sb)) { - close(db->fd); return KISSDB_ERROR_IO; } - if (lseek(db->fd, 0, SEEK_CUR) < KISSDB_HEADER_SIZE) + + + /* mmap whole database file if it already exists (else the file is mapped in writeheader()) */ + if (sb.st_size > 0) + { + if(db->shared->openMode != KISSDB_OPEN_MODE_RDONLY ) + { + db->mappedDb = (void*) mmap(NULL, sb.st_size, PROT_WRITE | PROT_READ, MAP_SHARED, db->fd, 0); + } + else + { + db->mappedDb = (void*) mmap(NULL, sb.st_size, PROT_READ, MAP_SHARED, db->fd, 0); + } + if (db->mappedDb == MAP_FAILED) + { + return KISSDB_ERROR_IO; + } + else + { + //update mapped size + db->shared->mappedDbSize = (uint64_t)sb.st_size; + db->dbMappedSize = db->shared->mappedDbSize; + } + } + + offset = sb.st_size; + if (offset == -1) + { + return KISSDB_ERROR_IO; + } + if ( offset < KISSDB_HEADER_SIZE) { /* write header if not already present */ if ((hash_table_size) && (key_size) && (value_size)) @@ -210,1029 +386,891 @@ int KISSDB_open(KISSDB *db, const char *path, int mode, uint16_t hash_table_size ret = writeHeader(db, &hash_table_size, &key_size, &value_size); if(0 != ret) { - close(db->fd); return ret; } - //Seek behind header - if (lseek(db->fd, KISSDB_HEADER_SIZE, SEEK_SET) == -1) - { - close(db->fd); - return KISSDB_ERROR_IO; - } } else { - close(db->fd); return KISSDB_ERROR_INVALID_PARAMETERS; } } else { - //read existing header + /* read existing header to verify database version */ ret = readHeader(db, &hash_table_size, &key_size, &value_size); if( 0 != ret) - return ret; - - if (lseek(db->fd, KISSDB_HEADER_SIZE, SEEK_SET) == -1) { - close(db->fd); - return KISSDB_ERROR_IO; - } //Seek behind header + return ret; + } } //store non shared db information - db->hash_table_size = hash_table_size; - db->key_size = key_size; - db->value_size = value_size; - db->hash_table_size_bytes = sizeof(Hashtable_slot_s) * (hash_table_size + 1); /* [hash_table_size] == next table */ + db->htSize = hash_table_size; + db->keySize = key_size; + db->valSize = value_size; + db->htSizeBytes = sizeof(Hashtable_s); - //DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("Hashtable size in bytes: "), DLT_UINT64(db->hash_table_size_bytes)); - - if (db->already_open == Kdb_false) //check if this instance has already opened the db before + if (db->alreadyOpen == Kdb_false) //check if this instance has already opened the db before { - db->shmem_cached_name = kdbGetShmName("-cache", path); - if(db->shmem_cached_name == NULL) - return KISSDB_ERROR_MALLOC; - db->shmem_info_name = kdbGetShmName("-shm-info", path); - if(db->shmem_info_name == NULL) + db->cacheName = kdbGetShmName("-cache", path); + if(db->cacheName == NULL) + { return KISSDB_ERROR_MALLOC; - db->shmem_info_fd = kdbShmemOpen(db->shmem_info_name, sizeof(Shared_Data_s), &db->shmem_creator); - if(db->shmem_info_fd < 0) - return KISSDB_ERROR_OPEN_SHM; - db->shmem_info = (Shared_Data_s *) getKdbShmemPtr(db->shmem_info_fd, sizeof(Shared_Data_s)); - if(db->shmem_info == ((void *) -1)) - return KISSDB_ERROR_MAP_SHM; - - size_t first_mapping; - if(db->shmem_info->shmem_size > db->hash_table_size_bytes ) - first_mapping = db->shmem_info->shmem_size; + } + //check if more than one hashtable is already in shared memory + if(db->shared->htShmSize > db->htSizeBytes ) + { + firstMappSize = db->shared->htShmSize; + } else - first_mapping = db->hash_table_size_bytes; + { + firstMappSize = db->htSizeBytes; + } //open / create shared memory for first hashtable - db->shmem_ht_name = kdbGetShmName("-ht", path); - if(db->shmem_ht_name == NULL) + db->htName = kdbGetShmName("-ht", path); + if(db->htName == NULL) + { return KISSDB_ERROR_MALLOC; - db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, first_mapping, &tmp_creator); - if(db->shmem_ht_fd < 0) + } + db->htFd = kdbShmemOpen(db->htName, firstMappSize, &tmpCreator); + if(db->htFd < 0) + { return KISSDB_ERROR_OPEN_SHM; - db->hash_tables = (Hashtable_slot_s *) getKdbShmemPtr(db->shmem_ht_fd, first_mapping); - if(db->hash_tables == ((void *) -1)) - return KISSDB_ERROR_MAP_SHM; - db->old_mapped_size = first_mapping; //local information - - //if shared memory for rwlock was opened (created) with this call to KISSDB_open for the first time -> init rwlock - if (db->shmem_creator == Kdb_true) + } + db->hashTables = (Hashtable_s*) getKdbShmemPtr(db->htFd, firstMappSize); + if(db->hashTables == ((void*) -1)) { - //[Initialize rwlock attributes] - pthread_rwlockattr_t rwlattr, cache_rwlattr; - pthread_rwlockattr_init(&rwlattr); - pthread_rwlockattr_init(&cache_rwlattr); - pthread_rwlockattr_setpshared(&rwlattr, PTHREAD_PROCESS_SHARED); - pthread_rwlockattr_setpshared(&cache_rwlattr, PTHREAD_PROCESS_SHARED); - pthread_rwlock_init(&db->shmem_info->rwlock, &rwlattr); - pthread_rwlock_init(&db->shmem_info->cache_rwlock, &cache_rwlattr); - Kdb_wrlock(&db->shmem_info->rwlock); - -#ifdef __checkerror - //CHECK POWERLOSS FLAGS - ret = checkErrorFlags(db); - if (0 != ret) - { - close(db->fd); - Kdb_unlock(&db->shmem_info->rwlock); - return ret; - } -#endif - db->shmem_info->num_hash_tables = 0; + return KISSDB_ERROR_MAP_SHM; } - else // already initialized - Kdb_wrlock(&db->shmem_info->rwlock); - - db->already_open = Kdb_true; + db->htMappedSize = firstMappSize; + db->alreadyOpen = Kdb_true; } - else - Kdb_wrlock(&db->shmem_info->rwlock); - //only read header from file into memory for first caller of KISSDB_open - if (db->shmem_creator == Kdb_true) + /* + * Read hashtables from file into memory ONLY for first caller of KISSDB_open + * Determine number of existing hashtables (db->shared->htNum) * + */ + if (db->shmCreator == Kdb_true ) { - httmp = (Hashtable_slot_s*) malloc(db->hash_table_size_bytes); //read hashtable from file - if (!httmp) + uint64_t offset = KISSDB_HEADER_SIZE; + //only read hashtables from file if file is larger than header + hashtable size + if(db->shared->mappedDbSize >= ( KISSDB_HEADER_SIZE + db->htSizeBytes) ) { - close(db->fd); - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_MALLOC; - } - while (read(db->fd, httmp, db->hash_table_size_bytes) == db->hash_table_size_bytes) - { - Kdb_bool result = Kdb_false; - //if new size would exceed old shared memory size-> allocate additional memory page to shared memory - if (db->hash_table_size_bytes * (db->shmem_info->num_hash_tables + 1) > db->old_mapped_size) + htptr = (Hashtable_s*) ( db->mappedDb + KISSDB_HEADER_SIZE); + htFound = Kdb_true; + while (htFound && offset < db->dbMappedSize ) { - Kdb_bool temp; - if (db->shmem_ht_fd <= 0) + //check for existing start OR end delimiter of hashtable + if (htptr->delimStart == HASHTABLE_START_DELIMITER || htptr->delimEnd == HASHTABLE_END_DELIMITER) { - db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp); - if(db->shmem_ht_fd < 0) + //if new size would exceed old shared memory size-> allocate additional memory page to shared memory + Kdb_bool result = Kdb_false; + if ( (db->htSizeBytes * (db->shared->htNum + 1)) > db->htMappedSize) + { + Kdb_bool temp; + if (db->htFd <= 0) + { + db->htFd = kdbShmemOpen(db->htName, db->htMappedSize, &temp); + if(db->htFd < 0) + { + return KISSDB_ERROR_OPEN_SHM; + } + } + result = resizeKdbShmem(db->htFd, &db->hashTables, db->htMappedSize, db->htMappedSize + db->htSizeBytes); + if (result == Kdb_false) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + db->shared->htShmSize = db->htMappedSize + db->htSizeBytes; + db->htMappedSize = db->shared->htShmSize; + } + } + // copy the current hashtable read from file to (htadress + (htsize * htcount)) in memory + memcpy(((uint8_t*) db->hashTables) + (db->htSizeBytes * db->shared->htNum), htptr, db->htSizeBytes); + ++db->shared->htNum; + + //read until all linked hashtables have been read + if (htptr->slots[db->htSize].offsetA ) //if a offset to a further hashtable exists { - free(httmp); - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_OPEN_SHM; + htptr = (Hashtable_s*) (db->mappedDb + htptr->slots[db->htSize].offsetA); //follow link to next hashtable + offset = htptr->slots[db->htSize].offsetA; + } + else //no link to next hashtable or link is invalid + { + htFound = Kdb_false; } } - result = resizeKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->old_mapped_size + db->hash_table_size_bytes); - if (result == Kdb_false) - { - free(httmp); - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_RESIZE_SHM; - } - else + else //delimiters of first hashtable or linked hashtable are invalid { - db->shmem_info->shmem_size = db->old_mapped_size + db->hash_table_size_bytes; - db->old_mapped_size = db->old_mapped_size + db->hash_table_size_bytes; + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": hashtable delimiters are invalid -> rebuild not possible!")); + htFound = Kdb_false; } } - // copy the current hashtable read from file to (htadress + (htsize * htcount)) in memory - memcpy(((uint8_t *) db->hash_tables) + (db->hash_table_size_bytes * db->shmem_info->num_hash_tables), httmp, db->hash_table_size_bytes); - ++db->shmem_info->num_hash_tables; - - //read until all hash tables have been read - if (httmp[db->hash_table_size].offsetA) //if httable[hash_table_size] contains a offset to a further hashtable + } + /* + * CHECK POWERLOSS FLAGS AND REBUILD DATABASE IF NECESSARY + */ + if (db->shared->openMode != KISSDB_OPEN_MODE_RDONLY) + { + if (checkErrorFlags(db) != 0) { - //ONE MORE HASHTABLE FOUND - if (lseek(db->fd, httmp[db->hash_table_size].offsetA, SEEK_SET) == -1) - { //move the filepointer to the next hashtable in the file - KISSDB_close(db); - free(httmp); - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(": database was not closed correctly in last lifecycle!")); + if (verifyHashtableCS(db) != 0) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(": A hashtable is invalid -> Start rebuild of hashtables!")); + if (rebuildHashtables(db) != 0) //hashtables are corrupt, walk through the database and search for data blocks -> then rebuild the hashtables + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": hashtable rebuild failed!")); + } + else + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_DEBUG, DLT_STRING(__FUNCTION__); DLT_STRING(": hashtable rebuild successful!")); + } + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":Start datablock check / recovery!")); + recoverDataBlocks(db); + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":End datablock check / recovery!")); + } + else + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":Start datablock check / recovery!")); + recoverDataBlocks(db); + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":End datablock check / recovery!")); } } - else - break; // no further hashtables exist } - free(httmp); } - - //printSharedHashtable(db); - - Kdb_unlock(&db->shmem_info->rwlock); + Kdb_unlock(&db->shared->rwlock); return 0; } - - - -int KISSDB_close(KISSDB *db) +int KISSDB_close(KISSDB* db) { - Kdb_wrlock(&db->shmem_info->rwlock); +#ifdef PFS_TEST + printf(" START: KISSDB_CLOSE \n"); +#endif - uint64_t crc = 0; + Hashtable_s* htptr = NULL; Header_s* ptr = 0; -#ifdef __showTimeMeasurements - long long KdbDuration = 0; - struct timespec mmapStart, mmapEnd; - KdbDuration = 0; -#endif + uint64_t crc = 0; + + Kdb_wrlock(&db->shared->rwlock); - //printSharedHashtable(db); - if (db->shmem_creator == Kdb_true) + //if no other instance has opened the database + if( db->shared->refCount == 0) { - //free shared hashtable - if( freeKdbShmemPtr(db->hash_tables, db->old_mapped_size) == Kdb_false) + if (db->shared->openMode != KISSDB_OPEN_MODE_RDONLY) { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_UNMAP_SHM; + if(db->htMappedSize < db->shared->htShmSize) + { + if ( Kdb_false == remapSharedHashtable(db->htFd, &db->hashTables, db->htMappedSize, db->shared->htShmSize)) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + db->htMappedSize = db->shared->htShmSize; + } + } + //remap database file if in the meanwhile another process added new data (key value pairs / hashtables) to the file (only happens if writethrough is used) + if (db->dbMappedSize < db->shared->mappedDbSize) + { + db->mappedDb = mremap(db->mappedDb, db->dbMappedSize, db->shared->mappedDbSize, MREMAP_MAYMOVE); + if (db->mappedDb == MAP_FAILED) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"), DLT_STRING(strerror(errno))); + return KISSDB_ERROR_IO; + } + else + { + db->dbMappedSize = db->shared->mappedDbSize; + } + } + + // generate checksum for every hashtable and write crc to file + if (db->fd) + { + int i = 0; + int offset = sizeof(Header_s); //offset in file to first hashtable + if (db->shared->htNum > 0) //if hashtables exist + { + //write hashtables and crc to file + for (i = 0; i < db->shared->htNum; i++) + { + crc = 0; + crc = (uint64_t) pcoCrc32(crc, (unsigned char*) db->hashTables[i].slots, sizeof(db->hashTables[i].slots)); + db->hashTables[i].crc = crc; + htptr = (Hashtable_s*) (db->mappedDb + offset); + //copy hashtable and generated crc from shared memory to mapped hashtable in file + memcpy(htptr, &db->hashTables[i], db->htSizeBytes); + offset = db->hashTables[i].slots[db->htSize].offsetA; + } + } + } + //update header (close flags) + ptr = (Header_s*) db->mappedDb; + ptr->closeFailed = 0x00; //remove closeFailed flag + ptr->closeOk = 0x01; //set closeOk flag + msync(db->mappedDb, KISSDB_HEADER_SIZE, MS_SYNC); } - if( kdbShmemClose(db->shmem_ht_fd, db->shmem_ht_name) == Kdb_false) - return KISSDB_ERROR_CLOSE_SHM; - free(db->shmem_ht_name); - Kdb_unlock(&db->shmem_info->rwlock); - pthread_rwlock_destroy(&db->shmem_info->rwlock); - pthread_rwlock_destroy(&db->shmem_info->cache_rwlock); + //unmap whole database file + munmap(db->mappedDb, db->dbMappedSize); + db->mappedDb = NULL; - // free shared information - if (freeKdbShmemPtr(db->shmem_info, sizeof(Kdb_bool)) == Kdb_false) - return KISSDB_ERROR_UNMAP_SHM; - if (kdbShmemClose(db->shmem_info_fd, db->shmem_info_name) == Kdb_false) + //unmap shared hashtables + munmap(db->hashTables, db->htMappedSize); + db->hashTables = NULL; + //close shared memory for hashtables + if( kdbShmemClose(db->htFd, db->htName) == Kdb_false) + { + close(db->fd); + Kdb_unlock(&db->shared->rwlock); return KISSDB_ERROR_CLOSE_SHM; - free(db->shmem_info_name); + } + db->htFd = 0; -#ifdef __showTimeMeasurements - clock_gettime(CLOCK_ID, &mmapStart); -#endif + if(db->htName != NULL) + { + free(db->htName); + db->htName = NULL; + } - //update header (checksum and flags) - int mapFlag = PROT_WRITE | PROT_READ; - ptr = (Header_s*) mmap(NULL,KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0); - if (ptr == MAP_FAILED) + //free rwlocks + Kdb_unlock(&db->shared->rwlock); + pthread_rwlock_destroy(&db->shared->rwlock); + + // unmap shared information + munmap(db->shared, sizeof(Shared_Data_s)); + db->shared = NULL; + + if (kdbShmemClose(db->sharedFd, db->sharedName) == Kdb_false) { close(db->fd); - return KISSDB_ERROR_IO; + return KISSDB_ERROR_CLOSE_SHM; } -#ifdef __checkerror - // generate checksum over database file (beginning at file offset [sizeof(ptr->KdbV) + sizeof(ptr->checksum)] up to EOF) - if( db->fd ) + db->sharedFd =0; + if(db->sharedName != NULL) { - crc = 0; - crc = (uint64_t) pcoCalcCrc32Csum(db->fd, sizeof(Header_s) ); - ptr->checksum = crc; - //printf("CLOSING ------ DB: %s, WITH CHECKSUM CALCULATED: %" PRIu64 " \n", db->shmem_ht_name, ptr->checksum); + free(db->sharedName); + db->sharedName = NULL; } -#endif - ptr->closeFailed = 0x00; //remove closeFailed flag - ptr->closeOk = 0x01; //set closeOk flag - - //sync changes with file - if( 0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE)) + if(db->cacheName != NULL) { - close(db->fd); - return KISSDB_ERROR_IO; + free(db->cacheName); //free memory for name obtained by kdbGetShmName() function + db->cacheName = NULL; } - //unmap memory - if( 0 != munmap(ptr, KISSDB_HEADER_SIZE)) + if( db->fd) { close(db->fd); - return KISSDB_ERROR_IO; + db->fd = 0; + } + + db->alreadyOpen = Kdb_false; + db->htSize = 0; + //db->cacheReferenced = 0; + db->keySize = 0; + db->valSize = 0; + db->htSizeBytes = 0; + db->htMappedSize = 0; + db->dbMappedSize = 0; + db->shmCreator = 0; + db->alreadyOpen = 0; + + //destroy named semaphore + if (-1 == sem_post(db->kdbSem)) //release semaphore + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, + DLT_STRING(__FUNCTION__); DLT_STRING(": sem_post() failed: "), + DLT_STRING(strerror(errno))); + } + if (-1 == sem_close(db->kdbSem)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, + DLT_STRING(__FUNCTION__); DLT_STRING(": sem_close() failed: "), + DLT_STRING(strerror(errno))); + } + if (-1 == sem_unlink(db->semName)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, + DLT_STRING(__FUNCTION__); DLT_STRING(": sem_unlink() failed: "), + DLT_STRING(strerror(errno))); + } + db->kdbSem = NULL; + if(db->semName != NULL) + { + free(db->semName); + db->semName = NULL; } -#ifdef __showTimeMeasurements - clock_gettime(CLOCK_ID, &mmapEnd); - KdbDuration += getNsDuration(&mmapStart, &mmapEnd); - printf("mmap duration for => %f ms\n", (double)((double)KdbDuration/NANO2MIL)); -#endif - fsync(db->fd); + + } + else + { + //if caller of close is not the last instance using the database + //unmap whole database file + munmap(db->mappedDb, db->dbMappedSize); + db->mappedDb = NULL; + + //unmap shared hashtables + munmap(db->hashTables, db->htMappedSize); + db->hashTables = NULL; if( db->fd) + { close(db->fd); + db->fd = 0; + } + if(db->htFd) + { + close(db->htFd); + db->htFd = 0; + } + + db->alreadyOpen = Kdb_false; + db->htSize = 0; + //db->cacheReferenced = 0; + db->keySize = 0; + db->valSize = 0; + db->htSizeBytes = 0; + db->htMappedSize = 0; + db->dbMappedSize = 0; + db->shmCreator = 0; + db->alreadyOpen = 0; - db->already_open = Kdb_false; - //memset(db, 0, sizeof(KISSDB)); //todo check if necessary + Kdb_unlock(&db->shared->rwlock); + + // unmap shared information + munmap(db->shared, sizeof(Shared_Data_s)); + db->shared = NULL; + + if(db->sharedFd) + { + close(db->sharedFd); + db->sharedFd = 0; + } + if(db->htName != NULL) + { + free(db->htName); + db->htName = NULL; + } + if(db->sharedName != NULL) + { + free(db->sharedName); + db->sharedName = NULL; + } + if(db->cacheName != NULL) + { + free(db->cacheName); //free memory for name obtained by kdbGetShmName() function + db->cacheName = NULL; + } + //clean struct + if (-1 == sem_post(db->kdbSem)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, + DLT_STRING(__FUNCTION__); DLT_STRING(": sem_post() in close failed: "), + DLT_STRING(strerror(errno))); + } + if (-1 == sem_close(db->kdbSem)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, + DLT_STRING(__FUNCTION__); DLT_STRING(": sem_close() in close failed: "), + DLT_STRING(strerror(errno))); + } + db->kdbSem = NULL; + if(db->semName != NULL) + { + free(db->semName); + db->semName = NULL; + } } - else - //if caller is not the creator of the lock - Kdb_unlock(&db->shmem_info->rwlock); +#ifdef PFS_TEST + printf(" END: KISSDB_CLOSE \n"); +#endif return 0; } -int KISSDB_get(KISSDB *db, const void *key, void *vbuf) +int KISSDB_get(KISSDB* db, const void* key, void* vbuf, uint32_t bufsize, uint32_t* vsize) { - Kdb_rdlock(&db->shmem_info->rwlock); - - uint8_t tmp[TMP_BUFFER_LENGTH]; - uint64_t current; - const uint8_t *kptr; + const uint8_t* kptr; + DataBlock_s* block; + Hashtable_slot_s* hashTable; + int64_t offset; + Kdb_bool bCanContinue = Kdb_true; + Kdb_bool bKeyFound = Kdb_false; + uint64_t hash = 0; unsigned long klen, i; - long n = 0; - uint64_t checksum, backupChecksum, crc; - uint64_t hash = KISSDB_hash(key, db->key_size) % (uint64_t) db->hash_table_size; - int64_t offset, backupOffset, htoffset, checksumOffset, flagOffset; //lasthtoffset - Hashtable_slot_s *cur_hash_table; - -#ifdef __writeThrough - //if new one or more hashtables were appended, remap shared memory block to adress space - if (db->old_mapped_size < db->shmem_info->shmem_size) - { - Kdb_bool temp; - db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp); - if(db->shmem_ht_fd < 0) - return KISSDB_ERROR_OPEN_SHM; - res = remapKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->shmem_info->shmem_size); - if (res == Kdb_false) - return KISSDB_ERROR_REMAP_SHM; - db->old_mapped_size = db->shmem_info->shmem_size; + + klen = strlen(key); + hash = KISSDB_hash(key, klen) % (uint64_t) db->htSize; + + if(db->htMappedSize < db->shared->htShmSize) + { + if ( Kdb_false == remapSharedHashtable(db->htFd, &db->hashTables, db->htMappedSize, db->shared->htShmSize)) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + db->htMappedSize = db->shared->htShmSize; + } } -#endif - htoffset = KISSDB_HEADER_SIZE; //lasthtoffset - cur_hash_table = db->hash_tables;//pointer to current hashtable in memory - for (i = 0; i < db->shmem_info->num_hash_tables; ++i) + hashTable = db->hashTables[0].slots; //pointer to first hashtable in memory at first slot + + if (db->dbMappedSize < db->shared->mappedDbSize) + { + db->mappedDb = mremap(db->mappedDb, db->dbMappedSize, db->shared->mappedDbSize, MREMAP_MAYMOVE); + if (db->mappedDb == MAP_FAILED) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"), DLT_STRING(strerror(errno))); + return KISSDB_ERROR_IO; + } + else + { + db->dbMappedSize = db->shared->mappedDbSize; + } + } + + + for (i = 0; i < db->shared->htNum; ++i) { - offset = cur_hash_table[hash].offsetA;//get fileoffset where the data can be found in the file -#ifdef __useBackups //get information about current valid offset to latest written data - if(cur_hash_table[hash].current == 0x00) //valid is offsetA + offset = (hashTable[hash].current == 0x00) ? hashTable[hash].offsetA : hashTable[hash].offsetB; // if 0x00 -> offsetA is latest else offsetB is latest + if(offset < 0) //deleted or invalidated data but search in next hashtable { - offset = cur_hash_table[hash].offsetA; - checksum = cur_hash_table[hash].checksumA; + bCanContinue = Kdb_false; //deleted datablock -> do not compare the key } else { - offset = cur_hash_table[hash].offsetB; - checksum = cur_hash_table[hash].checksumB; + bCanContinue = Kdb_true; //possible match -> compare the key } -#endif - if (offset >= KISSDB_HEADER_SIZE) //if a valid offset is available in the slot + if(Kdb_true == bCanContinue) { - if (lseek(db->fd, offset, SEEK_SET) == -1) //move filepointer to this offset + if( abs(offset) > db->dbMappedSize ) { - Kdb_unlock(&db->shmem_info->rwlock); return KISSDB_ERROR_IO; } - kptr = (const uint8_t *) key; - klen = db->key_size; - while (klen) + if (offset >= KISSDB_HEADER_SIZE) //if a valid offset is available in the slot { - n = (long) read(db->fd, tmp, (klen > sizeof(tmp)) ? sizeof(tmp) : klen); - if (n > 0) - { - if (memcmp(kptr, tmp, n))//if key does not match -> search in next hashtable - goto get_no_match_next_hash_table; - kptr += n; - klen -= (unsigned long) n; - } - else + block = (DataBlock_s*) (db->mappedDb + offset); + kptr = (const uint8_t*) key; + if (klen > 0) { - Kdb_unlock(&db->shmem_info->rwlock); - return 1; /* not found */ + if (memcmp(kptr, block->key, klen) + || strlen(block->key) != klen) //if search key does not match with key in file + { + bKeyFound = Kdb_false; + } + else + { + bKeyFound = Kdb_true; + } } - } - if (read(db->fd, vbuf, db->value_size) == db->value_size) //if key matches at the fileoffset -> read the value - { - //crc check for file content -#ifdef __useBackups - //only validate checksums at read if checksum of file is invalid - if (db->shmem_info->crc_invalid == Kdb_true) + if(Kdb_true == bKeyFound) { - //verify checksum of current key/value pair - crc = 0; - crc = (uint64_t) pcoCrc32(crc, (unsigned char*) vbuf, db->value_size); - if (checksum != crc) + //copy found value if buffer is big enough + if(bufsize >= block->valSize) { - //printf("KISSDB_get: WARNING: checksum invalid -> try to read from valid data block \n"); - //try to read valid data from backup - Hashtable_slot_s slot = cur_hash_table[hash]; - if (cur_hash_table[hash].current == 0x00) //current is offsetA, but Data there is corrupt--> so use offsetB as backupOffset - { - backupOffset = cur_hash_table[hash].offsetB; - backupChecksum = cur_hash_table[hash].checksumB; - checksumOffset = htoffset + (sizeof(Hashtable_slot_s) * hash + sizeof(slot.offsetA)); //offset that points to checksumA - current = 0x01; //current is offsetB - } - else - { - backupOffset = cur_hash_table[hash].offsetA; - backupChecksum = cur_hash_table[hash].checksumA; - checksumOffset = htoffset - + (sizeof(Hashtable_slot_s) * hash + sizeof(slot.offsetA) + sizeof(slot.checksumA) - + sizeof(slot.offsetB)); //offset that points to checksumB - current = 0x00; - } - flagOffset = htoffset - + (sizeof(Hashtable_slot_s) * hash + (sizeof(Hashtable_slot_s) - sizeof(slot.current))); //offset that points to currentflag - - //seek to backup data - if (lseek(db->fd, backupOffset + db->key_size, SEEK_SET) == -1) //move filepointer to data of key-value pair //TODO make checksum over key AND data ?? - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - - //verify checksum of backup key/value pair - //read from backup data - if (read(db->fd, vbuf, db->value_size) == db->value_size) //read value of backup Data block - { - //generate checksum of backup - crc = 0; - crc = (uint64_t) pcoCrc32(crc, (unsigned char*) vbuf, db->value_size); - if (crc == backupChecksum) //if checksum ok - { - //printf("KISSDB_get: WARNING: OVERWRITING CORRUPT DATA \n"); - //seek to corrupt data - if (lseek(db->fd, offset + db->key_size, SEEK_SET) == -1) //move filepointer to data of corrupt key-value pair - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - //overwrite corrupt data - if (write( db->fd, vbuf, db->value_size) != db->value_size ) //write value - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - //seek to header slot and update checksum of corrupt data (do not modify offsets) - if (lseek(db->fd, checksumOffset, SEEK_SET) == -1) //move to checksumX in file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, &crc, sizeof(uint64_t)) != sizeof(uint64_t) ) //write checksumX to hashtbale slot - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - //update checksumX in memory - if (cur_hash_table[hash].current == 0x00) //current is offsetA, but Data there is corrupt--> so update checksumA with new checksum - cur_hash_table[hash].checksumA = crc; - else - cur_hash_table[hash].checksumB = crc; - //switch current valid to backup - - if (lseek(db->fd, flagOffset, SEEK_SET) == -1) //move to current flag in file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, ¤t, sizeof(uint64_t)) != sizeof(uint64_t) ) //write current hashtable slot in file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - //update current valid in memory - cur_hash_table[hash].current = current; - //fsync(db->fd) - Kdb_unlock(&db->shmem_info->rwlock); - return 0; /* success */ - } - else //if checksum not valid, return NOT FOUND - { - Kdb_unlock(&db->shmem_info->rwlock); - return 1; /* not found */ - } - } - else - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } + memcpy(vbuf, block->value, block->valSize); } + *(vsize) = block->valSize; + return 0; /* success */ } -#endif - Kdb_unlock(&db->shmem_info->rwlock); - return 0; /* success */ } else { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + return 1; /* not found */ } } - else - { - Kdb_unlock(&db->shmem_info->rwlock); - return 1; /* not found */ - } - //get_no_match_next_hash_table: cur_hash_table += db->hash_table_size + 1; - get_no_match_next_hash_table: //update lastht offset //lasthtoffset = htoffset - htoffset = cur_hash_table[db->hash_table_size].offsetA; // fileoffset to the next file-hashtable - cur_hash_table += (db->hash_table_size + 1); //pointer to the next memory-hashtable + hashTable = (Hashtable_slot_s*) ((char*) hashTable + sizeof(Hashtable_s)); //pointer to the next memory-hashtable } - Kdb_unlock(&db->shmem_info->rwlock); return 1; /* not found */ } -//TODO check current valid data to be deleted ? -int KISSDB_delete(KISSDB *db, const void *key) +int KISSDB_delete(KISSDB* db, const void* key, int32_t* bytesDeleted) { - Kdb_wrlock(&db->shmem_info->rwlock); - - uint8_t tmp[TMP_BUFFER_LENGTH]; - //uint64_t current = 0x00; - const uint8_t *kptr; - long n; - unsigned long klen, i; - //uint64_t crc = 0x00; - uint64_t hash = KISSDB_hash(key, db->key_size) % (uint64_t) db->hash_table_size; - //int64_t empty_offset = 0; - int64_t empty_offsetB = 0; + const uint8_t* kptr; + DataBlock_s* backupBlock; + DataBlock_s* block; + Hashtable_slot_s* hashTable; + int64_t backupOffset = 0; int64_t offset = 0; - int64_t htoffset = 0; - Hashtable_slot_s *cur_hash_table; + Kdb_bool bCanContinue = Kdb_true; + Kdb_bool bKeyFound = Kdb_false; + uint64_t hash = 0; + uint64_t crc = 0x00; + unsigned long klen, i; + + klen = strlen(key); + hash = KISSDB_hash(key, klen) % (uint64_t) db->htSize; + *(bytesDeleted) = PERS_COM_ERR_NOT_FOUND; -#ifdef __writeThrough - //if new hashtable was appended, remap shared memory block to adress space - if (db->old_mapped_size < db->shmem_info->shmem_size) + if(db->htMappedSize < db->shared->htShmSize) { - Kdb_bool temp; - db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp); - if(db->shmem_ht_fd < 0) - return KISSDB_ERROR_OPEN_SHM; - result = remapKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->shmem_info->shmem_size); - if (result == Kdb_false) - return KISSDB_ERROR_REMAP_SHM; - db->old_mapped_size = db->shmem_info->shmem_size; + if ( Kdb_false == remapSharedHashtable(db->htFd, &db->hashTables, db->htMappedSize, db->shared->htShmSize)) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + db->htMappedSize = db->shared->htShmSize; + } } -#endif - htoffset = KISSDB_HEADER_SIZE; - cur_hash_table = db->hash_tables; //pointer to current hashtable in memory + hashTable = db->hashTables->slots; //pointer to current hashtable in memory - for (i = 0; i < db->shmem_info->num_hash_tables; ++i) + //remap database file if in the meanwhile another process added new data (key value pairs / hashtables) to the file + if (db->dbMappedSize < db->shared->mappedDbSize) { - offset = cur_hash_table[hash].offsetA; //get fileoffset where the data can be found in the file - if (offset >= KISSDB_HEADER_SIZE) + db->mappedDb = mremap(db->mappedDb, db->dbMappedSize, db->shared->mappedDbSize, MREMAP_MAYMOVE); + if (db->mappedDb == MAP_FAILED) { - if (lseek(db->fd, offset, SEEK_SET) == -1) - { - //set filepointer to Key value offset in file - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - kptr = (const uint8_t *) key; - klen = db->key_size; - while (klen) + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"), DLT_STRING(strerror(errno))); + return KISSDB_ERROR_IO; + } + db->dbMappedSize = db->shared->mappedDbSize; + } + + for (i = 0; i < db->shared->htNum; ++i) + { + //get information about current valid offset to latest written data + if (hashTable[hash].current == 0x00) //valid is offsetA + { + offset = hashTable[hash].offsetA; + backupOffset = hashTable[hash].offsetB; + } + else + { + offset = hashTable[hash].offsetB; + backupOffset = hashTable[hash].offsetA; + } + if (offset < 0) //deleted or invalidated data but search in next hashtable + { + bCanContinue = Kdb_false; + } + else + { + bCanContinue = Kdb_true; + } + if (Kdb_true == bCanContinue) + { + if (offset >= KISSDB_HEADER_SIZE) { - n = (long) read(db->fd, tmp, (klen > sizeof(tmp)) ? sizeof(tmp) : klen); - if (n > 0) + if( abs(offset) > db->dbMappedSize || abs(backupOffset) > db->dbMappedSize) { - if (memcmp(kptr, tmp, n))//if key does not match, search in next hashtable - goto get_no_match_next_hash_table; - kptr += n; - klen -= (unsigned long) n; + return KISSDB_ERROR_IO; } - else + + block = (DataBlock_s*) (db->mappedDb + offset); + kptr = (const uint8_t*) key; //pointer to search key + if (klen > 0) { - Kdb_unlock(&db->shmem_info->rwlock); - return 1; /* not found */ + if (memcmp(kptr, block->key, klen) || strlen(block->key) != klen) //if search key does not match with key in file + { + bKeyFound = Kdb_false; + } + else + { + bKeyFound = Kdb_true; + } + } + /* data to be deleted was found + write "deleted block delimiters" for both blocks and delete key / value */ + if (Kdb_true == bKeyFound) + { + block->delimStart = (offset < backupOffset) ? DATA_BLOCK_A_DELETED_START_DELIMITER : DATA_BLOCK_B_DELETED_START_DELIMITER; + //memset(block->key, 0, db->keySize); //do not delete key -> used in hashtable rebuild + memset(block->value, 0, db->valSize); + block->valSize = 0; + crc = 0x00; + crc = (uint32_t) pcoCrc32(crc, (unsigned char*)block->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t) ); + block->crc = crc; + block->delimEnd = (offset < backupOffset) ? DATA_BLOCK_A_DELETED_END_DELIMITER : DATA_BLOCK_B_DELETED_END_DELIMITER; + + backupBlock = (DataBlock_s*) (db->mappedDb + backupOffset); //map data and backup block + + backupBlock->delimStart = (backupOffset < offset) ? DATA_BLOCK_A_DELETED_START_DELIMITER : DATA_BLOCK_B_DELETED_START_DELIMITER; + //memset(backupBlock->key, 0, db->keySize); + memset(backupBlock->value, 0, db->valSize); + backupBlock->valSize = 0; + crc = 0x00; + crc = (uint32_t) pcoCrc32(crc, (unsigned char*)backupBlock->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t) ); + backupBlock->crc = crc; + backupBlock->delimEnd = (backupOffset < offset) ? DATA_BLOCK_A_DELETED_END_DELIMITER : DATA_BLOCK_B_DELETED_END_DELIMITER; + + + //negate offsetB, delete checksums and current flag in memory + hashTable[hash].offsetA = -hashTable[hash].offsetA; //negate offset in hashtable that points to the data + hashTable[hash].offsetB = -hashTable[hash].offsetB; + hashTable[hash].current = 0x00; + + *(bytesDeleted) = block->valSize; + return 0; /* success */ } } - //TODO: mmap Hashtable slot structure to avoid seeking -> align hashtables at a multiple of a pagesize -#ifdef __useFileMapping - empty_offsetB = -(offset + (db->key_size + db->value_size)); //todo check if offset is rewritten in put function ! - cur_hash_table[hash].offsetB = empty_offsetB; - cur_hash_table[hash].checksumA = 0x00; - cur_hash_table[hash].checksumB = 0x00; - cur_hash_table[hash].current = 0x00; - int testoffset= lseek(fd, 0, SEEK_CUR); //filepointer position - int myoffset = htoffset + (sizeof(Hashtable_slot_s) * hash); - - printf("Endoffset in file: %d , Offset for mmap: %d , size for mmap: %d \n", testoffset, myoffset, sizeof(Hashtable_slot_s)); - - //mmap the current hashtable slot - int mapFlag = PROT_WRITE | PROT_READ; - printf("In Delete: filedes: %d\n", db->fd); - htSlot = (Hashtable_slot_s*) mmap(NULL, sizeof(Hashtable_slot_s), mapFlag, MAP_SHARED, db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash) ); //TODO offset must be a multiple of pagesize - if (htSlot == MAP_FAILED) - { - printf("MMAP ERROR !\n"); - close(db->fd); - return KISSDB_ERROR_IO; - } - //do changes to slot in file - htSlot->offsetA = empty_offset; - htSlot->checksumA = 0x00; - htSlot->offsetB = empty_offsetB; - htSlot->checksumB = 0x00; - htSlot->current = 0x00; - - //sync changes with file - if (0 != msync(htSlot, sizeof(Hashtable_slot_s), MS_SYNC | MS_INVALIDATE)) - { - close(db->fd); - return KISSDB_ERROR_IO; - } - //unmap memory - if (0 != munmap(htSlot, sizeof(Hashtable_slot_s))) - { - close(db->fd); - return KISSDB_ERROR_IO; - } -#endif - if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move Filepointer to used slot in file-hashtable. - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - -#ifndef __useBackups - cur_hash_table[hash].offsetA = -offset; //negate offset in hashtable that points to the data - empty_offset = -offset; - //update hashtable slot in file header (delete existing offset information) - if (write( db->fd, &empty_offset, sizeof(int64_t)) != sizeof(int64_t) ) //mark slot in file-hashtable as deleted - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } -#endif - -#ifdef __useBackups - //negate offsetB, delete checksums and current flag in memory - cur_hash_table[hash].offsetA = -offset; //negate offset in hashtable that points to the data - empty_offsetB = -(offset + (db->key_size + db->value_size)); - cur_hash_table[hash].checksumA = 0x00; - cur_hash_table[hash].offsetB = empty_offsetB; - cur_hash_table[hash].checksumB = 0x00; - cur_hash_table[hash].current = 0x00; - if (write( db->fd, &cur_hash_table[hash], sizeof(Hashtable_slot_s)) != sizeof(Hashtable_slot_s) ) //write updated data in the file-hashtable slot + else { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + return 1; /* not found */ //if no offset is found at hashed position in slots } -#endif - //TODO currently, no synchronus Filedescriptor is used!!!! fsync after fflush is needed to do synchronus writes - //fsync(db->fd) // associating a file stream with a synchronous file descriptor means that an fsync() call is not needed on the file descriptor after the fflush() - Kdb_unlock(&db->shmem_info->rwlock); - return 0; /* success */ - } - else - { - Kdb_unlock(&db->shmem_info->rwlock); - return 1; /* not found */ //if no offset is found at hashed position in ht } - get_no_match_next_hash_table: htoffset = cur_hash_table[db->hash_table_size].offsetA; // fileoffset to next ht in file - cur_hash_table += (db->hash_table_size + 1); //pointer to next hashtable in memory + hashTable = (Hashtable_slot_s*) ((char*) hashTable + sizeof(Hashtable_s)); //pointer to the next memory-hashtable } - Kdb_unlock(&db->shmem_info->rwlock); return 1; /* not found */ } -int KISSDB_put(KISSDB *db, const void *key, const void *value) -{ - Kdb_wrlock(&db->shmem_info->rwlock); +// To improve write amplifiction: sort the keys at writeback for sequential write +//return offset where key would be written if Kissdb_put with same key is called +//int determineKeyOffset(KISSDB* db, const void* key) +//{ +// /* +// * - hash the key, +// * - go through hashtables and get corresponding offset to hash +// * - if offset is negative return the inverse offset +// * - if key matches at file offset return this offset +// */ +// return 0; +//} - uint8_t tmp[TMP_BUFFER_LENGTH]; - uint64_t current = 0x00; - const uint8_t *kptr; - unsigned long klen, i; - uint64_t hash = KISSDB_hash(key, db->key_size) % (uint64_t) db->hash_table_size; - int64_t offset, endoffset, htoffset, lasthtoffset; - Hashtable_slot_s *cur_hash_table; + + + +int KISSDB_put(KISSDB* db, const void* key, const void* value, int valueSize, int32_t* bytesWritten) +{ + const uint8_t* kptr; + DataBlock_s* backupBlock; + DataBlock_s* block; + Hashtable_s* hashtable; + Hashtable_s* htptr; + Hashtable_slot_s* hashTable; + int64_t offset, backupOffset, endoffset; + Kdb_bool bKeyFound = Kdb_false; Kdb_bool result = Kdb_false; Kdb_bool temp = Kdb_false; uint64_t crc = 0x00; - long n; - char delimiter[8] = "||||||||"; + uint64_t hash = 0; + unsigned long klen, i; -#ifdef __writeThrough - //if new hashtable was appended, remap shared memory block to adress space - if(db->old_mapped_size < db->shmem_info->shmem_size) + klen = strlen(key); + hash = KISSDB_hash(key, klen) % (uint64_t) db->htSize; + *(bytesWritten) = 0; + + if(db->htMappedSize < db->shared->htShmSize) { - db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp); - if(db->shmem_ht_fd < 0) - return KISSDB_ERROR_OPEN_SHM; - res = remapKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size,db->shmem_info->shmem_size); - if (res == Kdb_false) - return KISSDB_ERROR_REMAP_SHM; - db->old_mapped_size = db->shmem_info->shmem_size; + if ( Kdb_false == remapSharedHashtable(db->htFd, &db->hashTables, db->htMappedSize, db->shared->htShmSize)) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + db->htMappedSize = db->shared->htShmSize; + } } -#endif - lasthtoffset = htoffset = KISSDB_HEADER_SIZE; - cur_hash_table = db->hash_tables; //pointer to current hashtable in memory - for (i = 0; i < db->shmem_info->num_hash_tables; ++i) + hashTable = db->hashTables->slots; //pointer to current hashtable in memory + + //remap database file (only necessary here in writethrough mode) if in the meanwhile another process added new data (key value pairs / hashtables) to the file + if (db->dbMappedSize < db->shared->mappedDbSize) + { + db->mappedDb = mremap(db->mappedDb, db->dbMappedSize, db->shared->mappedDbSize, MREMAP_MAYMOVE); + if (db->mappedDb == MAP_FAILED) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"), DLT_STRING(strerror(errno))); + return KISSDB_ERROR_IO; + } + db->dbMappedSize = db->shared->mappedDbSize; + } + + + for (i = 0; i < db->shared->htNum; ++i) { - offset = cur_hash_table[hash].offsetA; //fileoffset to data in file + offset = hashTable[hash].offsetA; //fileoffset to data in file if (offset >= KISSDB_HEADER_SIZE || offset < 0) //if a key with same hash is already in this slot or the same key must be overwritten { + if( abs(offset) > db->dbMappedSize ) + { + return KISSDB_ERROR_IO; + } + // if slot is marked as deleted, use this slot and negate the offset in order to reuse the existing data block if(offset < 0) { offset = -offset; //get original offset where data was deleted //printf("Overwriting slot for key: [%s] which was deleted before, offsetA: %d \n",key, offset); - if (lseek(db->fd, offset, SEEK_SET) == -1) //move filepointer to fileoffset where the key can be found - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, key, db->key_size) != db->key_size ) //write key - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, value, db->value_size) != db->value_size ) //write value - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } + writeDualDataBlock(db, offset, i, key, klen, value, valueSize); + hashTable[hash].offsetA = offset; //write the offset to the data in the memory-hashtable slot + offset += sizeof(DataBlock_s); + hashTable[hash].offsetB = offset; //write the offset to the second databloxk in the memory-hashtable slot + hashTable[hash].current = 0x00; + *(bytesWritten) = valueSize; - // write same key and value again here because slot was deleted an can be reused like an initial write -#ifdef __useBackups - if (write( db->fd, key, db->key_size) != db->key_size ) //write key - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, value, db->value_size) != db->value_size ) //write value - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } -#endif - //seek back to hashtbale slot - if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move to beginning of hashtable slot in file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - -#ifndef __useBackups - cur_hash_table[hash].offsetA = offset; //write the offset to the data in the memory-hashtable slot - if (write( db->fd, &offset, sizeof(int64_t)) != sizeof(int64_t) ) //write the offsetA to the data in the file-hashtable slot - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } -#endif - -#ifdef __useBackups - crc = 0x00; - crc = (uint32_t) pcoCrc32(crc, (unsigned char*)value, db->value_size); - cur_hash_table[hash].offsetA = offset; //write the offset to the data in the memory-hashtable slot - cur_hash_table[hash].checksumA = crc; - offset += (db->key_size + db->value_size); - cur_hash_table[hash].offsetB = offset; //write the offset to the data in the memory-hashtable slot - cur_hash_table[hash].checksumB = crc; - cur_hash_table[hash].current = 0x00; - - if (write( db->fd, &cur_hash_table[hash], sizeof(Hashtable_slot_s)) != sizeof(Hashtable_slot_s) ) //write updated data in the file-hashtable slot - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } -#endif - //fsync(db->fd) //associating a file stream with a synchronous file descriptor means that an fsync() call is not needed on the file descriptor after the fflush() - Kdb_unlock(&db->shmem_info->rwlock); return 0; /* success */ } - //overwrite existing if key matches - // if cur_hash_table[hash].current == 0x00 -> offsetA is latest so write to offsetB else offsetB is latest and write to offsetA -#ifdef __useBackups - if( cur_hash_table[hash].current == 0x00 ) - offset = cur_hash_table[hash].offsetA; //0x00 -> offsetA is latest - else - offset = cur_hash_table[hash].offsetB; //else offsetB is latest -#endif - if (lseek(db->fd, offset, SEEK_SET) == -1) //move filepointer to fileoffset where valid data can be found - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - - kptr = (const uint8_t *) key; //pointer to search key - klen = db->key_size; - while (klen) - { - n = (long) read(db->fd, tmp, (klen > sizeof(tmp)) ? sizeof(tmp) : klen); - if (n > 0) - { - if (memcmp(kptr, tmp, n)) //if search key does not match with key in file - goto put_no_match_next_hash_table; - kptr += n; - klen -= (unsigned long) n; - } - } - - //if key matches -> seek to currently non valid data block for this key -#ifdef __useBackups - if( cur_hash_table[hash].current == 0x00 ) - offset = cur_hash_table[hash].offsetB; // 0x00 -> offsetA is latest so write new data to offsetB which holds old data - else - offset = cur_hash_table[hash].offsetA; // offsetB is latest so write new data to offsetA which holds old data - - if (lseek(db->fd, offset, SEEK_SET) == -1)//move filepointer to fileoffset where backup data can be found - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, key, db->key_size) != db->key_size ) //write key - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } -#endif - if (write( db->fd, value, db->value_size) != db->value_size ) + offset = (hashTable[hash].current == 0x00) ? hashTable[hash].offsetA : hashTable[hash].offsetB; // if 0x00 -> offsetA is latest else offsetB is latest + block = (DataBlock_s*) (db->mappedDb + offset); + kptr = (const uint8_t*) key; //pointer to search key + if (klen > 0) { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - // seek back to slot in header for update of checksum and flag - if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move to beginning of hashtable slot in file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - - //generate crc for value - crc = 0x00; - crc = (uint64_t) pcoCrc32(crc, (unsigned char*)value, db->value_size); - current = 0x00; - Hashtable_slot_s slot = cur_hash_table[hash]; - - // check current flag and decide what parts of hashtable slot in file must be updated - if( cur_hash_table[hash].current == 0x00 ) //offsetA is latest -> modify settings of B - { - int seek = sizeof(slot.offsetA) + sizeof(slot.checksumA) + sizeof(slot.offsetB); - lseek(db->fd, seek , SEEK_CUR); //move to checksumB in file - if( write( db->fd, &crc, sizeof(uint64_t)) != sizeof(uint64_t)) //write checksumB to file + if (memcmp(kptr, block->key, klen) + || strlen(block->key) != klen) //if search key does not match with key in file { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + //if key does not match -> search in next hashtable + bKeyFound = Kdb_false; } - current = 0x01; - if( write( db->fd, ¤t, sizeof(uint64_t)) != sizeof(uint64_t)) //write current to hashtbale slot + else { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + bKeyFound = Kdb_true; } - cur_hash_table[hash].checksumB = crc; - cur_hash_table[hash].current = current; } - else //offsetB is latest -> modify settings of A + if(Kdb_true == bKeyFound) { + backupOffset = (hashTable[hash].current == 0x00) ? hashTable[hash].offsetB : hashTable[hash].offsetA; // if 0x00 -> offsetB is latest backup else offsetA is latest - int seek = sizeof(slot.offsetA); - lseek(db->fd, seek , SEEK_CUR); //move to checksumA in file - if( write( db->fd, &crc, sizeof(uint64_t)) != sizeof(uint64_t)) //write checksumA to file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - seek = sizeof(slot.offsetB) + sizeof(slot.checksumB);; - lseek(db->fd, seek , SEEK_CUR); //move to checksumA in file - current = 0x00; - if( write( db->fd, ¤t, sizeof(uint64_t)) != sizeof(uint64_t))//write current to hashtbale slot - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - cur_hash_table[hash].checksumA = crc; - cur_hash_table[hash].current = current; + //ALSO OVERWRITE LATEST VALID BLOCK to improve write amplification factor + block->delimStart = (offset < backupOffset) ? DATA_BLOCK_A_START_DELIMITER : DATA_BLOCK_B_START_DELIMITER; + block->valSize = valueSize; + memcpy(block->value,value, block->valSize); + block->htNum = i; + crc = 0x00; + crc = (uint32_t) pcoCrc32(crc, (unsigned char*)block->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t) ); + block->crc = crc; + block->delimEnd = (offset < backupOffset) ? DATA_BLOCK_A_END_DELIMITER : DATA_BLOCK_B_END_DELIMITER; + + //if key matches -> seek to currently non valid data block for this key + backupOffset = (hashTable[hash].current == 0x00) ? hashTable[hash].offsetB : hashTable[hash].offsetA; // if 0x00 -> offsetB is latest backup else offsetA is latest + backupBlock = (DataBlock_s*) (db->mappedDb + backupOffset); + //backupBlock->delimStart = DATA_BLOCK_START_DELIMITER; + backupBlock->delimStart = (backupOffset < offset) ? DATA_BLOCK_A_START_DELIMITER : DATA_BLOCK_B_START_DELIMITER; + backupBlock->valSize = valueSize; + memcpy(backupBlock->value,value, backupBlock->valSize); + backupBlock->htNum = i; + crc = 0x00; + crc = (uint32_t) pcoCrc32(crc, (unsigned char*)backupBlock->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t) ); + backupBlock->crc = crc; + //backupBlock->delimEnd = DATA_BLOCK_END_DELIMITER; + backupBlock->delimEnd = (backupOffset < offset) ? DATA_BLOCK_A_END_DELIMITER : DATA_BLOCK_B_END_DELIMITER; + // check current flag and decide what parts of hashtable slot in file must be updated + hashTable[hash].current = (hashTable[hash].current == 0x00) ? 0x01 : 0x00; // if 0x00 -> offsetA is latest -> set to 0x01 else /offsetB is latest -> modify settings of A set 0x00 + *(bytesWritten) = valueSize; + + return 0; //success } - Kdb_unlock(&db->shmem_info->rwlock); - return 0; //success } else //if key is not already inserted { /* add new data if an empty hash table slot is discovered */ - if (lseek(db->fd, 0, SEEK_END) == -1) //filepointer to the end of the file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - endoffset = lseek(db->fd, 0, SEEK_CUR); //filepointer position - if (write( db->fd, key, db->key_size) != db->key_size ) - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, value, db->value_size) != db->value_size ) + endoffset = db->shared->mappedDbSize; + if ( -1 == endoffset) //filepointer to the end of the file { - Kdb_unlock(&db->shmem_info->rwlock); return KISSDB_ERROR_IO; } - // write same key and value again here --> initial write -#ifdef __useBackups - if (write( db->fd, key, db->key_size) != db->key_size ) - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, value, db->value_size) != db->value_size ) + //truncate file -> data + backup block + if( ftruncate(db->fd, endoffset + ( sizeof(DataBlock_s) * 2) ) < 0) { - Kdb_unlock(&db->shmem_info->rwlock); return KISSDB_ERROR_IO; } -#endif - if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move filepointer to file-hashtable slot in file (offsetA) - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } -#ifndef __useBackups - if (write( db->fd, &endoffset, sizeof(int64_t)) != sizeof(int64_t) ) //write the offsetA to the data in the file-hashtable slot - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - cur_hash_table[hash].offsetA = endoffset; //write the offsetA to the data in the memory-hashtable slot -#endif -#ifdef __useBackups - crc = 0x00; - crc = (uint64_t) pcoCrc32(crc, (unsigned char*) value, db->value_size); - offset = endoffset + (db->key_size + db->value_size); - cur_hash_table[hash].offsetA = endoffset; //write the offsetA to the data in the memory-hashtable slot - cur_hash_table[hash].checksumA = crc; - cur_hash_table[hash].offsetB = offset; //write the offset to the data in the memory-hashtable slot - cur_hash_table[hash].checksumB = crc; - cur_hash_table[hash].current = 0x00; - current = 0x00; //current - - if (write( db->fd, &cur_hash_table[hash], sizeof(Hashtable_slot_s)) != sizeof(Hashtable_slot_s) ) //write updated data in the file-hashtable slot + db->mappedDb = mremap(db->mappedDb, db->dbMappedSize, db->shared->mappedDbSize + (sizeof(DataBlock_s) * 2), MREMAP_MAYMOVE); + if (db->mappedDb == MAP_FAILED) { - Kdb_unlock(&db->shmem_info->rwlock); + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"),DLT_STRING(strerror(errno))); return KISSDB_ERROR_IO; } -#endif - //fsync(db->fd) // associating a file stream with a synchronous file descriptor means that an fsync() call is not needed on the file descriptor after the fflush() - Kdb_unlock(&db->shmem_info->rwlock); - return 0; /* success */ - } - put_no_match_next_hash_table: lasthtoffset = htoffset; - htoffset = cur_hash_table[db->hash_table_size].offsetA; // fileoffset to the next file-hashtable - cur_hash_table += (db->hash_table_size + 1); //pointer to the next memory-hashtable - } - /* if no existing slots, add a new page of hash table entries */ - if (lseek(db->fd, 0, SEEK_END) == -1) //Filepointer to the end of file - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if(db->shmem_info->num_hash_tables > 0) //only write delimiter if first hashtable has been written (first delimiter is written by open call) - { - if (write( db->fd, &delimiter, sizeof(delimiter)) != sizeof(delimiter) ) //write delimiter - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + db->shared->mappedDbSize = db->shared->mappedDbSize + (sizeof(DataBlock_s) * 2); //shared info about database file size + db->dbMappedSize = db->shared->mappedDbSize; //local info about mapped size of file + + writeDualDataBlock(db, endoffset, i, key, klen, value, valueSize); + + //update hashtable entry + offset = endoffset + sizeof(DataBlock_s); + hashTable[hash].offsetA = endoffset; //write the offsetA to the data in the memory-hashtable slot + hashTable[hash].offsetB = offset; //write the offset to the data in the memory-hashtable slot + hashTable[hash].current = 0x00; + + *(bytesWritten) = valueSize; + return 0; /* success */ } + hashTable = (Hashtable_slot_s*) ((char*) hashTable + sizeof(Hashtable_s)); //pointer to the next memory-hashtable } - endoffset = lseek(db->fd, 0, SEEK_CUR); - //if new size would exceed old shared memory size-> allocate additional memory to shared memory (+ db->hash_table_size_bytes) - if( (db->hash_table_size_bytes * (db->shmem_info->num_hash_tables + 1)) > db->shmem_info->shmem_size) + //if new size would exceed old shared memory size for hashtables-> allocate additional memory to shared memory (+ db->htSizeBytes) + if( (db->htSizeBytes * (db->shared->htNum + 1)) > db->shared->htShmSize) { - if (db->shmem_ht_fd <= 0) + //munlockall(); + if (db->htFd <= 0) { - db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp); - if(db->shmem_ht_fd < 0) + db->htFd = kdbShmemOpen(db->htName, db->htMappedSize, &temp); + if(db->htFd < 0) + { return KISSDB_ERROR_OPEN_SHM; + } } - result = resizeKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->old_mapped_size + db->hash_table_size_bytes); + result = resizeKdbShmem(db->htFd, &db->hashTables, db->htMappedSize, db->htMappedSize + db->htSizeBytes); if (result == Kdb_false) { return KISSDB_ERROR_RESIZE_SHM; } else { - db->shmem_info->shmem_size = db->old_mapped_size + db->hash_table_size_bytes; - db->old_mapped_size = db->shmem_info->shmem_size; + db->shared->htShmSize = db->htMappedSize + db->htSizeBytes; + db->htMappedSize = db->shared->htShmSize; } + //mlockall(MCL_FUTURE); } - //if( currentHtOffset <= db->old_mapped_size / sizeof(Hashtable_slot_s) ) - cur_hash_table = &(db->hash_tables[(db->hash_table_size + 1) * db->shmem_info->num_hash_tables]); - //else - // return KISSDB_ERROR_ACCESS_VIOLATION; - memset(cur_hash_table, 0, db->hash_table_size_bytes); //hashtable init - cur_hash_table[hash].offsetA = endoffset + db->hash_table_size_bytes; /* where new entry will go (behind the new Ht that gets written)*/ - -#ifdef __useBackups - crc = 0x00; - crc = (uint64_t) pcoCrc32(crc, (unsigned char*)value, db->value_size); - cur_hash_table[hash].checksumA = crc; - cur_hash_table[hash].checksumB = crc; - cur_hash_table[hash].offsetB = cur_hash_table[hash].offsetA + (db->key_size + db->value_size);//write the offset to the data in the memory-hashtable slot - cur_hash_table[hash].current = 0x00; -#endif - - // write new hashtable at the end of the file - if (write( db->fd, cur_hash_table, db->hash_table_size_bytes) != db->hash_table_size_bytes ) - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - // write key behind new hashtable - if (write( db->fd, key, db->key_size) != db->key_size ) + /* if no existing slots, add a new page of hash table entries */ + endoffset = db->shared->mappedDbSize; + if ( -1 == endoffset) //filepointer to the end of the file { - Kdb_unlock(&db->shmem_info->rwlock); return KISSDB_ERROR_IO; } - // write value behind key - if (write( db->fd, value, db->value_size) != db->value_size ) + //truncate file in order to save new hashtable + two Datablocks (this does not modify filedescriptor) + if (ftruncate(db->fd, endoffset + db->htSizeBytes + (sizeof(DataBlock_s) * 2)) < 0) { - Kdb_unlock(&db->shmem_info->rwlock); return KISSDB_ERROR_IO; } - // write same key and value again here --> initial write -#ifdef __useBackups - if (write( db->fd, key, db->key_size) != db->key_size ) + + db->mappedDb = mremap(db->mappedDb, db->dbMappedSize, db->shared->mappedDbSize + (db->htSizeBytes + (sizeof(DataBlock_s) * 2)), MREMAP_MAYMOVE); + if (db->mappedDb == MAP_FAILED) { - Kdb_unlock(&db->shmem_info->rwlock); + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"),DLT_STRING(strerror(errno))); return KISSDB_ERROR_IO; } - if (write( db->fd, value, db->value_size) != db->value_size ) + db->shared->mappedDbSize = db->shared->mappedDbSize + (db->htSizeBytes + (sizeof(DataBlock_s) * 2)); + db->dbMappedSize = db->shared->mappedDbSize; + + //prepare new hashtable in shared memory + hashtable = &(db->hashTables[db->shared->htNum]); + memset(hashtable, 0, db->htSizeBytes); //hashtable init + hashtable->delimStart = HASHTABLE_START_DELIMITER; + hashtable->delimEnd = HASHTABLE_END_DELIMITER; + hashtable->crc = 0x00; + hashTable = hashtable->slots; //pointer to the next memory-hashtable + hashTable[hash].offsetA = endoffset + db->htSizeBytes; /* where new entry will go (behind the new Ht that gets written)*/ + hashTable[hash].offsetB = hashTable[hash].offsetA + sizeof(DataBlock_s);//write the offset to the data in the memory-hashtable slot + hashTable[hash].current = 0x00; + + htptr = (Hashtable_s*) (db->mappedDb + endoffset); + //copy hashtable in shared memory to mapped hashtable in file + memcpy(htptr, hashtable, db->htSizeBytes); + //write data behind new hashtable + writeDualDataBlock(db, endoffset + db->htSizeBytes, db->shared->htNum, key, klen, value, valueSize); + //if a hashtable exists, update link to new hashtable in previous hashtable + if (db->shared->htNum) { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; + db->hashTables[db->shared->htNum -1].slots[HASHTABLE_SLOT_COUNT].offsetA = endoffset; //update link to new hashtable in previous hashtable } -#endif + ++db->shared->htNum; + + *(bytesWritten) = valueSize; - //if a hashtable exists, update link to new hashtable - if (db->shmem_info->num_hash_tables) - { - if (lseek(db->fd, lasthtoffset + (sizeof(Hashtable_slot_s) * db->hash_table_size), SEEK_SET) == -1) - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - if (write( db->fd, &endoffset, sizeof(int64_t)) != sizeof(int64_t) ) - { - Kdb_unlock(&db->shmem_info->rwlock); - return KISSDB_ERROR_IO; - } - db->hash_tables[((db->hash_table_size + 1) * (db->shmem_info->num_hash_tables - 1)) + db->hash_table_size].offsetA = endoffset; //update link to new hashtable in old hashtable - } - ++db->shmem_info->num_hash_tables; - //fsync(db->fd) - Kdb_unlock(&db->shmem_info->rwlock); return 0; /* success */ } @@ -1242,34 +1280,25 @@ int KISSDB_put(KISSDB *db, const void *key, const void *value) /* * prints the offsets stored in the shared Hashtable */ -void printSharedHashtable(KISSDB *db) +void printSharedHashtable(KISSDB* db) { - Hashtable_slot_s *cur_hash_table; - cur_hash_table = db->hash_tables; - unsigned long k; - unsigned long x = (db->hash_table_size * db->shmem_info->num_hash_tables); - //printf("Address of SHARED HT_NUMBER: %p \n", &db->shmem_info->num_hash_tables); - printf("Address of SHARED HEADER: %p \n", &cur_hash_table); - Header_s* ptr; - printf("HT Struct sizes: %d, %d, %d, %d,%d, %d, %d, %d\n", sizeof(ptr->KdbV), sizeof(ptr->checksum), sizeof(ptr->closeFailed), sizeof(ptr->closeOk), sizeof(ptr->hash_table_size),sizeof(ptr->key_size),sizeof(ptr->value_size),sizeof(ptr->delimiter)); - printf("HEADER SIZE: %d \n", sizeof(Header_s)); - printf("Hashtable_slot_s SIZE: %d \n", sizeof(Hashtable_slot_s)); - for (k = 0; k < x; k++) + unsigned long k=0; + int i = 0; + + for(i =0 ; i< db->shared->htNum; i++) { - if (db->hash_tables[k].offsetA != 0) + for (k = 0; k <= db->htSize; k++) { - printf("offsetA [%lu]: %" PRId64 " \n", k, db->hash_tables[k].offsetA); - printf("checksumA[%lu]: %" PRIu64 " \n", k, db->hash_tables[k].checksumA); - printf("offsetB [%lu]: %" PRId64 " \n", k, db->hash_tables[k].offsetB); - printf("checksumB[%lu]: %" PRIu64 " \n", k, db->hash_tables[k].checksumB); - printf("current [%lu]: %" PRIu64 " \n", k, db->hash_tables[k].current); + printf("ht[%d] offsetA [%lu]: %" PRId64 " \n",i, k, db->hashTables[i].slots[k].offsetA); + printf("ht[%d] offsetB [%lu]: %" PRId64 " \n",i, k, db->hashTables[i].slots[k].offsetB); + printf("ht[%d] current [%lu]: %" PRIu64 " \n",i, k, db->hashTables[i].slots[k].current); } } } #endif -void KISSDB_Iterator_init(KISSDB *db, KISSDB_Iterator *dbi) +void KISSDB_Iterator_init(KISSDB* db, KISSDB_Iterator* dbi) { dbi->db = db; dbi->h_no = 0; // number of read hashtables @@ -1277,235 +1306,953 @@ void KISSDB_Iterator_init(KISSDB *db, KISSDB_Iterator *dbi) } -int KISSDB_Iterator_next(KISSDB_Iterator *dbi, void *kbuf, void *vbuf) +int KISSDB_Iterator_next(KISSDB_Iterator* dbi, void* kbuf, void* vbuf) { + DataBlock_s* block; + Hashtable_slot_s* ht; int64_t offset; - Kdb_rdlock(&dbi->db->shmem_info->rwlock); - if ((dbi->h_no < (dbi->db->shmem_info->num_hash_tables)) && (dbi->h_idx < dbi->db->hash_table_size)) + if(dbi->db->htMappedSize < dbi->db->shared->htShmSize) + { + if ( Kdb_false == remapSharedHashtable(dbi->db->htFd, &dbi->db->hashTables, dbi->db->htMappedSize, dbi->db->shared->htShmSize)) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + dbi->db->htMappedSize = dbi->db->shared->htShmSize; + } + } + + //remap database file if in the meanwhile another process added new data (key value pairs / hashtables) to the file + if (dbi->db->dbMappedSize < dbi->db->shared->mappedDbSize) + { + dbi->db->mappedDb = mremap(dbi->db->mappedDb, dbi->db->dbMappedSize, dbi->db->shared->mappedDbSize, MREMAP_MAYMOVE); + if (dbi->db->mappedDb == MAP_FAILED) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(":mremap error: !"), DLT_STRING(strerror(errno))); + return KISSDB_ERROR_IO; + } + dbi->db->dbMappedSize = dbi->db->shared->mappedDbSize; + } + + if ((dbi->h_no < (dbi->db->shared->htNum)) && (dbi->h_idx < dbi->db->htSize)) { - //TODO check for currently valid data block flag and use this offset instead of offsetA - while (!(offset = dbi->db->hash_tables[((dbi->db->hash_table_size + 1) * dbi->h_no) + dbi->h_idx].offsetA)) + ht = dbi->db->hashTables[dbi->h_no].slots; //pointer to first hashtable + + while ( !(ht[dbi->h_idx].offsetA || ht[dbi->h_idx].offsetB) ) //until a offset was found { - if (++dbi->h_idx >= dbi->db->hash_table_size) + if (++dbi->h_idx >= dbi->db->htSize) { dbi->h_idx = 0; - if (++dbi->h_no >= (dbi->db->shmem_info->num_hash_tables)) + if (++dbi->h_no >= (dbi->db->shared->htNum)) { - Kdb_unlock(&dbi->db->shmem_info->rwlock); return 0; } + else + { + ht = dbi->db->hashTables[dbi->h_no].slots; //next hashtable + } } } - - if (lseek(dbi->db->fd, offset, SEEK_SET) == -1) - return KISSDB_ERROR_IO; - if (read(dbi->db->fd, kbuf, dbi->db->key_size) != dbi->db->key_size) - return KISSDB_ERROR_IO; - if (vbuf != NULL) + if(ht[dbi->h_idx].current == 0x00) { - if (read(dbi->db->fd, vbuf, dbi->db->value_size) != dbi->db->value_size) - return KISSDB_ERROR_IO; + offset = ht[dbi->h_idx].offsetA; } else { - if (lseek(dbi->db->fd, dbi->db->value_size, SEEK_CUR) == -1) - return KISSDB_ERROR_IO; + offset = ht[dbi->h_idx].offsetB; + } + + if( abs(offset) > dbi->db->dbMappedSize ) + { + return KISSDB_ERROR_IO; } - if (++dbi->h_idx >= dbi->db->hash_table_size) + block = (DataBlock_s*) (dbi->db->mappedDb + offset); + memcpy(kbuf,block->key, dbi->db->keySize); + if (vbuf != NULL) + { + memcpy(vbuf, block->value, dbi->db->valSize); + } + if (++dbi->h_idx >= dbi->db->htSize) { dbi->h_idx = 0; ++dbi->h_no; } - Kdb_unlock(&dbi->db->shmem_info->rwlock); return 1; } - Kdb_unlock(&dbi->db->shmem_info->rwlock); return 0; } -int readHeader(KISSDB* db, uint16_t* hash_table_size, uint64_t* key_size, uint64_t* value_size) + +int readHeader(KISSDB* db, uint16_t* htSize, uint64_t* keySize, uint64_t* valSize) { - //set Filepointer to the beginning of the file - if (lseek(db->fd, 0, SEEK_SET) == -1) - return KISSDB_ERROR_IO; - //mmap header from beginning of file - int mapFlag = PROT_WRITE | PROT_READ; Header_s* ptr = 0; - ptr = (Header_s*) mmap(NULL, KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0); - if (ptr == MAP_FAILED) - return KISSDB_ERROR_IO; + ptr = (Header_s*) db->mappedDb; - if ((ptr->KdbV[0] != 'K') || (ptr->KdbV[1] != 'd') || (ptr->KdbV[2] != 'B') || (ptr->KdbV[3] != KISSDB_VERSION)) + if ((ptr->KdbV[0] != 'K') || (ptr->KdbV[1] != 'd') || (ptr->KdbV[2] != 'B') ) + { return KISSDB_ERROR_CORRUPT_DBFILE; - - if (!ptr->hash_table_size) + } + if( (ptr->KdbV[3] != KISSDB_MAJOR_VERSION) || (ptr->KdbV[4] != '.') || (ptr->KdbV[5] != KISSDB_MINOR_VERSION)) + { + return KISSDB_ERROR_WRONG_DATABASE_VERSION; + } + if (!ptr->htSize) + { return KISSDB_ERROR_CORRUPT_DBFILE; - (*hash_table_size) = (uint16_t) ptr->hash_table_size; - - if (!ptr->key_size) + } + (*htSize) = (uint16_t) ptr->htSize; + if (!ptr->keySize) + { return KISSDB_ERROR_CORRUPT_DBFILE; - (*key_size) = (uint64_t) ptr->key_size; + } + (*keySize) = (uint64_t) ptr->keySize; - if (!ptr->value_size) + if (!ptr->valSize) + { return KISSDB_ERROR_CORRUPT_DBFILE; - (*value_size) = (uint64_t) ptr->value_size; - - //sync changes with file - if (0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE)) - return KISSDB_ERROR_IO; - - //unmap memory - if (0 != munmap(ptr, KISSDB_HEADER_SIZE)) - return KISSDB_ERROR_IO; + } + (*valSize) = (uint64_t) ptr->valSize; return 0; } -int writeHeader(KISSDB* db, uint16_t* hash_table_size, uint64_t* key_size, uint64_t* value_size) +int writeHeader(KISSDB* db, uint16_t* htSize, uint64_t* keySize, uint64_t* valSize) { Header_s* ptr = 0; int ret= 0; - //Seek to beginning of file - if (lseek(db->fd, 0, SEEK_SET) == -1) - return KISSDB_ERROR_IO; - - //ftruncate file to needed size for header + //truncate file to needed size for header ret = ftruncate(db->fd, KISSDB_HEADER_SIZE); if (ret < 0) + { return KISSDB_ERROR_IO; - - //mmap header from beginning of file - int mapFlag = PROT_WRITE | PROT_READ; - ptr = (Header_s*) mmap(NULL, KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0); - if (ptr == MAP_FAILED) + } + //mmap whole file for the first time + db->mappedDb = (void*) mmap(NULL, KISSDB_HEADER_SIZE, PROT_WRITE | PROT_READ, MAP_SHARED, db->fd, 0); + if (db->mappedDb == MAP_FAILED) + { return KISSDB_ERROR_IO; + } + db->shared->mappedDbSize = KISSDB_HEADER_SIZE; + db->dbMappedSize = KISSDB_HEADER_SIZE; + ptr = (Header_s*) db->mappedDb; ptr->KdbV[0] = 'K'; ptr->KdbV[1] = 'd'; ptr->KdbV[2] = 'B'; - ptr->KdbV[3] = KISSDB_VERSION; - ptr->KdbV[4] = '-'; - ptr->KdbV[5] = '-'; - ptr->KdbV[6] = '-'; - ptr->KdbV[7] = '-'; + ptr->KdbV[3] = KISSDB_MAJOR_VERSION; + ptr->KdbV[4] = '.'; + ptr->KdbV[5] = KISSDB_MINOR_VERSION; + ptr->KdbV[6] = '0'; + ptr->KdbV[7] = '0'; ptr->checksum = 0x00; ptr->closeFailed = 0x00; //remove closeFailed flag ptr->closeOk = 0x01; //set closeOk flag - ptr->hash_table_size = (uint64_t)(*hash_table_size); - ptr->key_size = (uint64_t)(*key_size); - ptr->value_size = (uint64_t)(*value_size); - memcpy(ptr->delimiter,"||||||||", 8); - - //sync changes with file - if (0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE)) - return KISSDB_ERROR_IO; + ptr->htSize = (uint64_t)(*htSize); + ptr->keySize = (uint64_t)(*keySize); + ptr->valSize = (uint64_t)(*valSize); + msync(db->mappedDb, KISSDB_HEADER_SIZE, MS_SYNC); - //unmap memory - if (0 != munmap(ptr, KISSDB_HEADER_SIZE)) - return KISSDB_ERROR_IO; return 0; } int checkErrorFlags(KISSDB* db) { - //mmap header from beginning of file - int mapFlag = PROT_WRITE | PROT_READ; - Header_s* ptr = 0; - ptr = (Header_s*) mmap(NULL, KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0); - if (ptr == MAP_FAILED) - return KISSDB_ERROR_IO; - //uint64_t crc = 0; + Header_s* ptr; + ptr = (Header_s*) db->mappedDb; -#ifdef __checkerror - //check if closeFailed flag is set - if(ptr->closeFailed == 0x01) + //check if closeFailed flag is set or closeOk is not set + if(ptr->closeFailed == 0x01 || ptr->closeOk == 0x00 ) { - //TODO implement verifyHashtableCS +#ifdef PFS_TEST + printf("CHECK ERROR FLAGS: CLOSE FAILED!\n"); +#endif + ptr->closeFailed = 0x01; //create close failed flags + ptr->closeOk = 0x00; + return KISSDB_ERROR_CORRUPT_DBFILE; + } + else + { +#ifdef PFS_TEST + printf("CHECK ERROR FLAGS: CLOSE OK!\n"); +#endif + ptr->closeFailed = 0x01; //NO: create close failed flag + ptr->closeOk = 0x00; + } + msync(db->mappedDb, KISSDB_HEADER_SIZE, MS_SYNC); - //if closeFailed flag is set, something went wrong at last close -> so check crc - db->shmem_info->crc_invalid = Kdb_true; //check crc for further reads + return 0; +} -#if 0 - DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB -> closeFailed flag is set: "), DLT_UINT64(ptr->closeFailed)); - crc = (uint64_t) pcoCalcCrc32Csum(db->fd, sizeof(Header_s)); - if(ptr->checksum != 0) //do not check if database is currently in creation + +int verifyHashtableCS(KISSDB* db) +{ + char* ptr; + Hashtable_s* hashtable; + int i = 0; + int ptrOffset=1; + int64_t offset = 0; + struct stat statBuf; + uint64_t crc = 0; + void* memory; + + if (db->fd) + { + //map entire file into memory + fstat(db->fd, &statBuf); + memory = (void*) mmap(NULL, statBuf.st_size, PROT_WRITE | PROT_READ, MAP_SHARED, db->fd, 0); + if (memory == MAP_FAILED) + { + return KISSDB_ERROR_IO; + } + ptr = (char*) memory; + db->shared->htNum = 0; + //unmap previously allocated and maybe corrupted hashtables + munmap(db->hashTables, db->htMappedSize); + db->hashTables = (Hashtable_s*) getKdbShmemPtr(db->htFd, db->htSizeBytes); + if(db->hashTables == ((void*) -1)) + { + return KISSDB_ERROR_MAP_SHM; + } + db->htMappedSize = db->htSizeBytes; //size for first hashtable + + //determine greatest common factor of hashtable and datablock used for pointer incrementation + ptrOffset = greatestCommonFactor(sizeof(DataBlock_s), sizeof(Hashtable_s) ); + + //offsets in mapped area to first hashtable + offset = sizeof(Header_s); + ptr += offset; + + //get number of hashtables in file (search for hashtable delimiters) and copy the hashtables to memory + while (offset <= (statBuf.st_size - db->htSizeBytes)) //begin with offset for first hashtable { - if (crc != ptr->checksum) + hashtable = (Hashtable_s*) ptr; + //if at least one of two hashtable delimiters are found + if (hashtable->delimStart == HASHTABLE_START_DELIMITER + || hashtable->delimEnd == HASHTABLE_END_DELIMITER) { - DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CHECKSUM IN HEADER : "), DLT_UINT64(ptr->checksum), DLT_STRING(" != CHECKSUM CALCULATED: "), DLT_UINT64(crc)); - //db->shmem_info->crc_invalid = Kdb_true; //check datablocks at further reads - //return KISSDB_ERROR_CORRUPT_DBFILE; //previous close failed and checksum invalid -> error state -> return error + //next hashtable to use + //rewrite delimiters to make sure that both exist + hashtable->delimStart = HASHTABLE_START_DELIMITER; + hashtable->delimEnd = HASHTABLE_END_DELIMITER; + Kdb_bool result = Kdb_false; + //if new size would exceed old shared memory size-> allocate additional memory page to shared memory + if (db->htSizeBytes * (db->shared->htNum + 1) > db->htMappedSize) + { + result = resizeKdbShmem(db->htFd, &db->hashTables, db->htMappedSize, db->htMappedSize + db->htSizeBytes); + if (result == Kdb_false) + { + return KISSDB_ERROR_RESIZE_SHM; + } + else + { + db->shared->htShmSize = db->htMappedSize + db->htSizeBytes; + db->htMappedSize = db->shared->htShmSize; + } + } + // copy the current hashtable read from file to (htadress + (htsize * htcount)) in memory + memcpy(((uint8_t*) db->hashTables) + (db->htSizeBytes * db->shared->htNum), ptr, db->htSizeBytes); + ++db->shared->htNum; + + //jump to next data block after hashtable + offset += sizeof(Hashtable_s); + ptr += sizeof(Hashtable_s); } else { - DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CECHKSUM IN HEADER: "), DLT_UINT64(ptr->checksum), DLT_STRING(" == CHECKSUM CALCULATED: "), DLT_UINT64(crc)); - //db->shmem_info->crc_invalid = Kdb_false; //do not check datablocks at further reads + offset += ptrOffset; + ptr += ptrOffset; } } - else - DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("Do not check checksum, database in creation: "), DLT_STRING(db->shmem_ht_name)); + munmap(memory, statBuf.st_size); + + //check CRC of all found hashtables + if (db->shared->htNum > 0) + { + for (i = 0; i < db->shared->htNum; i++) + { + crc = 0; + crc = (uint64_t) pcoCrc32(crc, (unsigned char*) db->hashTables[i].slots, sizeof(db->hashTables[i].slots)); + if (db->hashTables[i].crc != crc) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(": Checksum of hashtable number: <"); DLT_INT(i); DLT_STRING("> is invalid")); +#ifdef PFS_TEST + printf("VERIFY HASHTABLE: hashtable #%d: CHECKSUM INVALID! \n",i); #endif + return -1; //start rebuild of hashtables + } + else + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_DEBUG, DLT_STRING(__FUNCTION__); DLT_STRING(": Checksum of hashtable number: <"); DLT_INT(i); DLT_STRING("> is OK")); + +#ifdef PFS_TEST + printf("VERIFY HASHTABLE: hashtable #%d: CHECKSUM OK! \n",i); +#endif + } + } + } } - else + return 0; +} + +int rebuildHashtables(KISSDB* db) +{ + char* ptr; + DataBlock_s* data; + DataBlock_s* dataA; + DataBlock_s* dataB; + Hashtable_s* hashtable; + int current = 0; + int ptrOffset = 1; + int64_t offset=0; + int64_t offsetA = 0; + struct stat statBuf; + uint64_t crc = 0; + uint64_t calcCrcA, calcCrcB, readCrcA, readCrcB; + void* memory; + + fstat(db->fd, &statBuf); + memory = (char*) mmap(NULL, statBuf.st_size, PROT_WRITE | PROT_READ, MAP_SHARED, db->fd, 0); + if (memory == MAP_FAILED) { - //DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB: closeFailed flag is not set: "), DLT_UINT64(ptr->closeFailed)); - ptr->closeFailed = 0x01; //NO: create close failed flag + return KISSDB_ERROR_IO; } + ptr = (char*) memory; + + //recover all hashtables of database + if (db->shared->htNum > 0) //htNum was determined in verifyhashtables() -> no reallocation is needed + { + ptr = (void*) memory; + memset(db->hashTables, 0, db->shared->htNum * sizeof(Hashtable_s)); + db->hashTables[0].delimStart = HASHTABLE_START_DELIMITER; + db->hashTables[0].delimEnd = HASHTABLE_END_DELIMITER; + + ptrOffset = greatestCommonFactor(sizeof(DataBlock_s), sizeof(Hashtable_s) ); + + //begin searching after first hashtable + offset = sizeof(Header_s) + sizeof(Hashtable_s); + ptr += offset; + //go through database file until offset + Datablock size reaches end of file mapping + while (offset <= (statBuf.st_size - sizeof(DataBlock_s))) + { + data = (DataBlock_s*) ptr; - //check if closeOk flag is set - if(ptr->closeOk == 0x01) + //if block A start or end delimiters were found + if (data->delimStart == DATA_BLOCK_A_START_DELIMITER + || data->delimEnd == DATA_BLOCK_A_END_DELIMITER) + { + //calculate checksum of Block A + calcCrcA = 0; + calcCrcA = (uint64_t) pcoCrc32(calcCrcA, (unsigned char*) data->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + readCrcA = data->crc; + + //search for block B start delimiter + offset += sizeof(DataBlock_s); + ptr += sizeof(DataBlock_s); + dataB = (DataBlock_s*) ptr; + if (dataB->delimStart == DATA_BLOCK_B_START_DELIMITER + || dataB->delimEnd == DATA_BLOCK_B_END_DELIMITER) + { + //verify checksum of Block B + calcCrcB = 0; + calcCrcB = (uint64_t) pcoCrc32(calcCrcB, (unsigned char*) dataB->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + readCrcB = dataB->crc; + if (readCrcB == calcCrcB) //checksum of block B matches + { + if (readCrcA == calcCrcA) //checksum of block A matches + { + if (1) //decide which datablock has latest written data - still statically using Block B for recovery because both blocks are written in kissdb_put + { + offsetA = offset - sizeof(DataBlock_s); + rebuildWithBlockB(dataB, db, offsetA, offset); + } + else + { + // use block A for rebuild + //write offsets for block a and block B + offsetA = offset - sizeof(DataBlock_s); + rebuildWithBlockA(data, db, offsetA, offset); + } + } + else //checksum of block A does not match, but checksum of block B was valid + { + // use block B for rebuild + offsetA = offset - sizeof(DataBlock_s); + rebuildWithBlockB(dataB, db, offsetA, offset); + } + } + else + { + if (readCrcA == calcCrcA) + { + // use block A for rebuild + //write offsets for block a and block B + offsetA = offset - sizeof(DataBlock_s); + rebuildWithBlockA(data, db, offsetA, offset); + } + else //checksum of block A and of Block B do not match ---> worst case scenario + { + invalidateBlocks(data, dataB, db); + } + } + } + else + { + //if checksum A matches and block B was not found + if (readCrcA == calcCrcA) + { + // use block A for rebuild + //write offsets for block a and block B + offsetA = offset - sizeof(DataBlock_s); + rebuildWithBlockA(data, db, offsetA, offset); + } + else //checksum of block A does not match and block B was not found + { + invalidateBlocks(data, dataB, db); + } + } + //jump behind datablock B + offset += sizeof(DataBlock_s); + ptr += sizeof(DataBlock_s); + } + //If a Bock B start or end delimiters were found: this only can happen if previous Block A was not found + else if (data->delimStart == DATA_BLOCK_B_START_DELIMITER + || data->delimEnd == DATA_BLOCK_B_END_DELIMITER) + { + dataA = (DataBlock_s*) (ptr - sizeof(DataBlock_s)) ; + //verify checksum of Block B + crc = 0; + crc = (uint64_t) pcoCrc32(crc, (unsigned char*) data->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + if (data->crc == crc) + { + //use block B for rebuild + //write offsets for block A and block B + offsetA = offset - sizeof(DataBlock_s); + rebuildWithBlockB(data, db, offsetA, offset); + } + else + { + invalidateBlocks(dataA, data, db); + } + //jump behind datablock B + offset += sizeof(DataBlock_s); + ptr += sizeof(DataBlock_s); + } + else if (data->delimStart == DATA_BLOCK_A_DELETED_START_DELIMITER + || data->delimEnd == DATA_BLOCK_A_DELETED_END_DELIMITER) + { + //calculate checksum of Block A + calcCrcA = 0; + calcCrcA = (uint64_t) pcoCrc32(calcCrcA, (unsigned char*) data->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + readCrcA = data->crc; + + //search for block B start delimiter + offset += sizeof(DataBlock_s); + ptr += sizeof(DataBlock_s); + dataB = (DataBlock_s*) ptr; + + if (dataB->delimStart == DATA_BLOCK_B_DELETED_START_DELIMITER + || dataB->delimEnd == DATA_BLOCK_B_DELETED_END_DELIMITER) + { + //calculate checksum of Block B + calcCrcB = 0; + calcCrcB = (uint64_t) pcoCrc32(calcCrcB, (unsigned char*) dataB->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + readCrcB = dataB->crc; + if (readCrcB == calcCrcB) //checksum of block B matches + { + offsetA = offset - sizeof(DataBlock_s); + invertBlockOffsets(data, db, offsetA, offset); + } + else + { + if (readCrcA == calcCrcA) + { + offsetA = offset - sizeof(DataBlock_s); + invertBlockOffsets(data, db, offsetA, offset); + } + else + { + invalidateBlocks(data, dataB, db); + } + } + } + else //NO BLOCK B Found + { + if (readCrcA == calcCrcA) + { + + offsetA = offset - sizeof(DataBlock_s); + invertBlockOffsets(data, db, offsetA, offset); + } + else + { + invalidateBlocks(data, dataB, db); + } + } + //jump behind datablock B + offset += sizeof(DataBlock_s); + ptr += sizeof(DataBlock_s); + } + else if (data->delimStart == DATA_BLOCK_B_DELETED_START_DELIMITER + || data->delimEnd == DATA_BLOCK_B_DELETED_END_DELIMITER) + { + crc = 0; + crc = (uint64_t) pcoCrc32(crc, (unsigned char*) data->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + if (crc == data->crc) + { + offsetA = offset - sizeof(DataBlock_s); + invertBlockOffsets(data, db, offsetA, offset); + } + else + { + dataA = (DataBlock_s*) (ptr - sizeof(DataBlock_s)) ; + invalidateBlocks(dataA, data, db); + } + //jump behind datablock B + offset += sizeof(DataBlock_s); + ptr += sizeof(DataBlock_s); + } + else if( offset <= (statBuf.st_size - sizeof(Hashtable_s)) ) //check if ptr range for hashtable is within mapping of file + { + hashtable = (Hashtable_s*) ptr; + + if (hashtable->delimStart == HASHTABLE_START_DELIMITER + || hashtable->delimEnd == HASHTABLE_END_DELIMITER) + { + //next hashtable to use + db->hashTables[current].slots[db->htSize].offsetA = offset; //update link to next hashtable in current hashtable + current++; + if (current < db->shared->htNum) + { + db->hashTables[current].delimStart = HASHTABLE_START_DELIMITER; + db->hashTables[current].delimEnd = HASHTABLE_END_DELIMITER; + } + else + { + return -1; + } + offset += sizeof(Hashtable_s); + ptr += sizeof(Hashtable_s); + } + else //if no hashtable is found + { + offset += ptrOffset; + ptr += ptrOffset; + } + } + else //if nothing is found for offsets in -> (filesize - hashtablesize) area + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(": No Datablock or hashtable area found!")); + //increment pointer by greatest common factor of hashtable size and datablock size + offset += ptrOffset; + ptr += ptrOffset; + } + } + } + msync(memory, statBuf.st_size, MS_SYNC | MS_INVALIDATE); + munmap(memory, statBuf.st_size); + return 0; +} + + +//invalidate block content for A and B +//this block can never be found / overwritten again +//new insertions can reuse hashtable entry but block is added at EOF +void invalidateBlocks(DataBlock_s* dataA, DataBlock_s* dataB, KISSDB* db) +{ + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": Datablock recovery for key: <"); DLT_STRING(dataA->key); DLT_STRING("> impossible: both datablocks are invalid!")); + + memset(dataA->key, 0, db->keySize); + memset(dataA->value, 0, db->valSize); + dataA->crc=0; + dataA->htNum = 0; + dataA->valSize = 0; + + memset(dataB->key, 0, db->keySize); + memset(dataB->value, 0, db->valSize); + dataB->crc=0; + dataB->htNum = 0; + dataB->valSize = 0; +} + + +void invertBlockOffsets(DataBlock_s* data, KISSDB* db, int64_t offsetA, int64_t offsetB) +{ + uint64_t hash = 0; + hash = KISSDB_hash(data->key, strlen(data->key)) % (uint64_t) db->htSize; + //invert offsets for deleted block A + db->hashTables[data->htNum].slots[hash].offsetA = - offsetA; + //invert offsets for deleted block B + db->hashTables[data->htNum].slots[hash].offsetB = - offsetB; + //reset current flag + db->hashTables[data->htNum].slots[hash].current = 0x00; +} + + +void rebuildWithBlockB(DataBlock_s* data, KISSDB* db, int64_t offsetA, int64_t offsetB) +{ + uint64_t hash = KISSDB_hash(data->key, strlen(data->key)) % (uint64_t) db->htSize; + //write offsets for block A and block B + db->hashTables[data->htNum].slots[hash].offsetA = offsetA; + db->hashTables[data->htNum].slots[hash].offsetB = offsetB; + //set block B as current + db->hashTables[data->htNum].slots[hash].current = 0x01; + + /* + DLT_LOG(persComLldbDLTCtx, DLT_LOG_DEBUG, DLT_STRING(__FUNCTION__); DLT_STRING(": Rebuild in hashtable No. <"); DLT_INT(data->htNum); + DLT_STRING("> with Datablock B for key: <"); DLT_STRING(data->key); DLT_STRING("- hash: <"); DLT_INT(hash); DLT_STRING("> - OffsetA: <"); DLT_INT(offsetA); + DLT_STRING("> - OffsetB: <"); DLT_INT(offsetB)); + */ +} + + +void rebuildWithBlockA(DataBlock_s* data, KISSDB* db, int64_t offsetA, int64_t offsetB) +{ + uint64_t hash = KISSDB_hash(data->key, strlen(data->key)) % (uint64_t) db->htSize; + //write offsets for block A and block B + db->hashTables[data->htNum].slots[hash].offsetA = offsetA; + db->hashTables[data->htNum].slots[hash].offsetB = offsetB; + //set block B as current + db->hashTables[data->htNum].slots[hash].current = 0x00; + + /* + DLT_LOG(persComLldbDLTCtx, DLT_LOG_DEBUG, DLT_STRING(__FUNCTION__); DLT_STRING(": Rebuild in hashtable No. <"); DLT_INT(data->htNum); + DLT_STRING("> with Datablock A for key: <"); DLT_STRING(data->key); DLT_STRING("- hash: <"); DLT_INT(hash); DLT_STRING("> - OffsetA: <"); DLT_INT(offsetA); + DLT_STRING("> - OffsetB: <"); DLT_INT(offsetB)); + */ +} + + + + +int recoverDataBlocks(KISSDB* db) +{ + +#ifdef PFS_TEST + printf("DATABLOCK RECOVERY: START! \n"); +#endif + + char* ptr; + DataBlock_s* data; + int i = 0; + int k = 0; + int64_t offset = 0; + struct stat statBuf; + uint64_t crc = 0; + void* memory; + + fstat(db->fd, &statBuf); + memory = (char*) mmap(NULL, statBuf.st_size, PROT_WRITE | PROT_READ, MAP_SHARED, db->fd, 0); + if (memory == MAP_FAILED) { - //DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB -> closeOk flag is set: "), DLT_UINT64(ptr->closeOk)); - ptr->closeOk = 0x00; + return KISSDB_ERROR_IO; } - else + ptr = (char*) memory; + + //go through all hashtables and jump to data blocks for crc validation + if (db->shared->htNum > 0) { - //if closeOK is not set , something went wrong at last close - db->shmem_info->crc_invalid = Kdb_true; //do crc check at read + ptr = (void*) memory; + for (i = 0; i < db->shared->htNum; i++) + { + for (k = 0; k < HASHTABLE_SLOT_COUNT; k++) + { + if (db->hashTables[i].slots[k].offsetA > 0) //ignore deleted or unused slots + { + ptr = (void*) memory; //reset pointer + //current valid data is offset A or offset B? + offset = (db->hashTables[i].slots[k].current == 0x00) ? db->hashTables[i].slots[k].offsetA : db->hashTables[i].slots[k].offsetB; + ptr += offset; //set pointer to current valid datablock + data = (DataBlock_s*) ptr; + //check crc of data block marked as current in hashtable + crc = 0; + crc = (uint64_t) pcoCrc32(crc, (unsigned char*) data->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + if (data->crc != crc) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING(__FUNCTION__); DLT_STRING(": Invalid datablock found at file offset: "); DLT_INT(offset)); +#ifdef PFS_TEST + printf("DATABLOCK RECOVERY: INVALID CRC FOR CURRENT DATABLOCK DETECTED! \n"); +#endif + ptr = (void*) memory; + //get offset to other data block and check crc + offset = (db->hashTables[i].slots[k].current == 0x00) ? db->hashTables[i].slots[k].offsetB : db->hashTables[i].slots[k].offsetA; + ptr += offset; + data = (DataBlock_s*) ptr; + crc = 0; + crc = (uint64_t) pcoCrc32(crc, (unsigned char*) data->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); + if (data->crc == crc) + { + //switch current flag if valid backup is available + db->hashTables[i].slots[k].current = (db->hashTables[i].slots[k].current == 0x00) ? 0x01 : 0x00; +#ifdef PFS_TEST + printf("DATABLOCK RECOVERY: REPAIR OF INVALID DATA SUCCESSFUL! \n"); +#endif + DLT_LOG(persComLldbDLTCtx, DLT_LOG_DEBUG, DLT_STRING(__FUNCTION__); DLT_STRING(": Invalid datablock for key: <"); DLT_STRING(data->key); DLT_STRING("> successfully recovered!")); + } + else + { + //invalidate data blocks if recovery fails + db->hashTables[i].slots[k].offsetA = - db->hashTables[i].slots[k].offsetA; + db->hashTables[i].slots[k].offsetB = - db->hashTables[i].slots[k].offsetB; + db->hashTables[i].slots[k].current = 0x00; + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": Datablock recovery for key: <"); DLT_STRING(data->key); DLT_STRING("> impossible: both datablocks are invalid!")); +#ifdef PFS_TEST + printf("DATABLOCK RECOVERY: ERROR -> BOTH BLOCKS ARE INVALID! \n"); +#endif + } + } + } + } + } + } + msync(memory, statBuf.st_size, MS_SYNC | MS_INVALIDATE); + munmap(memory, statBuf.st_size); -#if 0 - crc = (uint64_t) pcoCalcCrc32Csum(db->fd, sizeof(Header_s)); - if(ptr->checksum != 0) //do not check if database is currently in creation +#ifdef PFS_TEST + printf("DATABLOCK RECOVERY: END! \n"); +#endif + + return 0; +} + + +int checkIsLink(const char* path, char* linkBuffer) +{ + char fileName[64] = { 0 }; + char truncPath[128] = { 0 }; + int isLink = 0; + int len = 0; + size_t strLen = 0; + struct stat statBuf; + uint16_t i = 0; + + memset(&statBuf, 0, sizeof(statBuf)); + strLen = strlen(path); + for (i = 0; i < strLen; i++) + { + if (path[i] == '/') + { + len = i; // remember the position of the last '/' + } + } + strncpy(truncPath, path, len); + truncPath[len + 1] = '\0'; // set end of string + strncpy(fileName, (const char*) path + len, 64); + + if (lstat(truncPath, &statBuf) != -1) + { + if (S_ISLNK(statBuf.st_mode)) { - if (crc != ptr->checksum) + if (readlink(truncPath, linkBuffer, 256) != -1) { - DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CHECKSUM IN HEADER : "), DLT_UINT64(ptr->checksum), DLT_STRING(" != CHECKSUM CALCULATED: "), DLT_UINT64(crc)); - //db->shmem_info->crc_invalid = Kdb_true; - //return KISSDB_ERROR_CORRUPT_DBFILE; //previous close failed and checksum invalid -> error state -> return error + strncat(linkBuffer, fileName, 256); + isLink = 1; } else { - DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CECHKSUM IN HEADER: "), DLT_UINT64(ptr->checksum), DLT_STRING(" == CHECKSUM CALCULATED: "), DLT_UINT64(crc)); - //db->shmem_info->crc_invalid = Kdb_false; + DLT_LOG(persComLldbDLTCtx, DLT_LOG_DEBUG, + DLT_STRING(__FUNCTION__); DLT_STRING(": readlink failed: "); DLT_STRING(strerror(errno))); + isLink = -1; } } else { - DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("Do not check checksum, database in creation: "), DLT_STRING(db->shmem_ht_name)); + isLink = -1; } + } + else + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, + DLT_STRING(__FUNCTION__); DLT_STRING(": lstat failed: "); DLT_STRING(strerror(errno))); + isLink = -1; + } + return isLink; +} - DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB -> closeOk flag is not set: "), DLT_UINT64(ptr->closeOk)); -#endif +void cleanKdbStruct(KISSDB* db) +{ + if (db->shared != NULL) + { + //Clean for every instance + if (db->mappedDb != NULL) + { + munmap(db->mappedDb, db->dbMappedSize); + db->mappedDb = NULL; + } + if (db->hashTables != NULL) + { + munmap(db->hashTables, db->htMappedSize); + db->hashTables = NULL; + } + if(db->cacheName != NULL) + { + free(db->cacheName); + db->cacheName = NULL; + } + if (db->fd) + { + close(db->fd); + db->fd = 0; + } + if(db->sharedCacheFd) + { + close(db->sharedCacheFd); + db->sharedCacheFd = -1; + } + db->alreadyOpen = Kdb_false; + db->htSize = 0; + //db->cacheReferenced = 0; + db->keySize = 0; + db->valSize = 0; + db->htSizeBytes = 0; + db->htMappedSize = 0; + db->dbMappedSize = 0; + db->shmCreator = 0; + + Kdb_unlock(&db->shared->rwlock); + + //Clean up for last instance referencing the database + if (db->shared->refCount == 0) + { + //close shared hashtable memory + if (db->htFd) + { + kdbShmemClose(db->htFd, db->htName); + db->htFd = 0; + } + if(db->htName != NULL) + { + free(db->htName); + db->htName = NULL; + } + //free rwlocks + pthread_rwlock_destroy(&db->shared->rwlock); + if (db->shared != NULL) + { + munmap(db->shared, sizeof(Shared_Data_s)); + db->shared = NULL; + } + if (db->sharedFd) + { + kdbShmemClose(db->sharedFd, db->sharedName); + db->sharedFd = 0; + } + if(db->sharedName != NULL) + { + free(db->sharedName); + db->sharedName = NULL; + } + + //destroy and unlock named semaphore only if ref counter is zero + if (-1 == sem_post(db->kdbSem)) //release semaphore + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": sem_post() in cleanup failed: "), + DLT_STRING(strerror(errno))); + } + if (-1 == sem_close(db->kdbSem)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": sem_close() in cleanup failed: "), + DLT_STRING(strerror(errno))); + } + if (-1 == sem_unlink(db->semName)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": sem_unlink() in cleanup failed: "), + DLT_STRING(strerror(errno))); + } + db->kdbSem = NULL; + } + else //Clean up if other instances have reference to the database + { + if (db->sharedFd) + { + close(db->sharedFd); + db->sharedFd = 0; + } + if (db->htFd) + { + close(db->htFd); + db->htFd = 0; + } + if (db->shared != NULL) + { + munmap(db->shared, sizeof(Shared_Data_s)); + db->shared = NULL; + } + if(db->htName != NULL) + { + free(db->htName); + db->htName = NULL; + } + if (-1 == sem_post(db->kdbSem)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": sem_post() in cleanup with refcounter > 0 failed: "), + DLT_STRING(strerror(errno))); + } + if (-1 == sem_close(db->kdbSem)) + { + DLT_LOG(persComLldbDLTCtx, DLT_LOG_ERROR, DLT_STRING(__FUNCTION__); DLT_STRING(": sem_close() in cleanup with refcounter > 0 failed: "), + DLT_STRING(strerror(errno))); + } + db->kdbSem = NULL; + } } -#endif - //sync changes with file - if (0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE)) - return KISSDB_ERROR_IO; +} - //unmap memory - if (0 != munmap(ptr, KISSDB_HEADER_SIZE)) - return KISSDB_ERROR_IO; + +int writeDualDataBlock(KISSDB* db, int64_t offset, int htNumber, const void* key, unsigned long klen, const void* value, int valueSize) +{ + DataBlock_s* backupBlock; + DataBlock_s* block; + uint64_t crc = 0x00; + + block = (DataBlock_s*) (db->mappedDb + offset); + block->delimStart = DATA_BLOCK_A_START_DELIMITER; + memcpy(block->key,key, klen); + block->valSize = valueSize; + memcpy(block->value,value, block->valSize); + block->htNum = htNumber; + crc = 0x00; + crc = (uint32_t) pcoCrc32(crc, (unsigned char*)block->key, db->keySize + sizeof(uint32_t) + db->valSize + sizeof(uint64_t)); //crc over key, datasize, data and htnum + block->crc = crc; + block->delimEnd = DATA_BLOCK_A_END_DELIMITER; + + // write same key and value again + backupBlock = (DataBlock_s*) ((char*) block + sizeof(DataBlock_s)); + backupBlock->delimStart = DATA_BLOCK_B_START_DELIMITER; + backupBlock->crc = crc; + memcpy(backupBlock->key,key, klen); + backupBlock->valSize = valueSize; + memcpy(backupBlock->value,value, backupBlock->valSize); + backupBlock->htNum = htNumber; + backupBlock->delimEnd = DATA_BLOCK_B_END_DELIMITER; return 0; } + + +int greatestCommonFactor(int x, int y) +{ + while (y != 0) + { + if (x > y) + { + x = x - y; + } + else + { + y = y - x; + } + } + return x; +} |