/* ldb database library using mdb back end Copyright (C) Jakub Hrozek 2014 Copyright (C) Catalyst.Net Ltd 2017 ** NOTE! The following LGPL license applies to the ldb ** library. This does NOT imply that all of Samba is released ** under the LGPL This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, see . */ #include "ldb_mdb.h" #include "../ldb_key_value/ldb_kv.h" #include "include/dlinklist.h" #define MDB_URL_PREFIX "mdb://" #define MDB_URL_PREFIX_SIZE (sizeof(MDB_URL_PREFIX)-1) #define LDB_MDB_MAX_KEY_LENGTH 511 #define GIGABYTE (1024*1024*1024) int ldb_mdb_err_map(int lmdb_err) { switch (lmdb_err) { case MDB_SUCCESS: return LDB_SUCCESS; case EIO: return LDB_ERR_OPERATIONS_ERROR; #ifdef EBADE case EBADE: #endif case MDB_INCOMPATIBLE: case MDB_CORRUPTED: case MDB_INVALID: return LDB_ERR_UNAVAILABLE; case MDB_BAD_TXN: case MDB_BAD_VALSIZE: #ifdef MDB_BAD_DBI case MDB_BAD_DBI: #endif case MDB_PANIC: case EINVAL: return LDB_ERR_PROTOCOL_ERROR; case MDB_MAP_FULL: case MDB_DBS_FULL: case MDB_READERS_FULL: case MDB_TLS_FULL: case MDB_TXN_FULL: case EAGAIN: return LDB_ERR_BUSY; case MDB_KEYEXIST: return LDB_ERR_ENTRY_ALREADY_EXISTS; case MDB_NOTFOUND: case ENOENT: return LDB_ERR_NO_SUCH_OBJECT; case EACCES: return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS; default: break; } return LDB_ERR_OTHER; } #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__) static int lmdb_error_at(struct ldb_context *ldb, int ecode, const char *file, int line) { int ldb_err = ldb_mdb_err_map(ecode); char *reason = mdb_strerror(ecode); ldb_asprintf_errstring(ldb, "(%d) - %s at %s:%d", ecode, reason, file, line); return ldb_err; } static bool lmdb_transaction_active(struct ldb_kv_private *ldb_kv) { return ldb_kv->lmdb_private->txlist != NULL; } static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx) { if (ltx == NULL) { return NULL; } return ltx->tx; } static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx) { if (lmdb->txlist) { talloc_steal(lmdb->txlist, ltx); } DLIST_ADD(lmdb->txlist, ltx); } static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx) { DLIST_REMOVE(lmdb->txlist, ltx); talloc_free(ltx); } static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb) { struct lmdb_trans *ltx; ltx = lmdb->txlist; return ltx; } static MDB_txn *get_current_txn(struct lmdb_private *lmdb) { MDB_txn *txn = NULL; txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb)); if (txn != NULL) { return txn; } if (lmdb->read_txn != NULL) { return lmdb->read_txn; } lmdb->error = MDB_BAD_TXN; ldb_set_errstring(lmdb->ldb, __location__":No active transaction\n"); return NULL; } static int lmdb_store(struct ldb_kv_private *ldb_kv, struct ldb_val key, struct ldb_val data, int flags) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; MDB_val mdb_key; MDB_val mdb_data; int mdb_flags; MDB_txn *txn = NULL; MDB_dbi dbi = 0; if (ldb_kv->read_only) { return LDB_ERR_UNWILLING_TO_PERFORM; } txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb)); if (txn == NULL) { ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); lmdb->error = MDB_PANIC; return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } mdb_key.mv_size = key.length; mdb_key.mv_data = key.data; mdb_data.mv_size = data.length; mdb_data.mv_data = data.data; if (flags == TDB_INSERT) { mdb_flags = MDB_NOOVERWRITE; } else if (flags == TDB_MODIFY) { /* * Modifying a record, ensure that it exists. * This mimics the TDB semantics */ MDB_val value; lmdb->error = mdb_get(txn, dbi, &mdb_key, &value); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } mdb_flags = 0; } else { mdb_flags = 0; } lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } return ldb_mdb_err_map(lmdb->error); } static int lmdb_delete(struct ldb_kv_private *ldb_kv, struct ldb_val key) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; MDB_val mdb_key; MDB_txn *txn = NULL; MDB_dbi dbi = 0; if (ldb_kv->read_only) { return LDB_ERR_UNWILLING_TO_PERFORM; } txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb)); if (txn == NULL) { ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); lmdb->error = MDB_PANIC; return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } mdb_key.mv_size = key.length; mdb_key.mv_data = key.data; lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } return ldb_mdb_err_map(lmdb->error); } static int lmdb_traverse_fn(struct ldb_kv_private *ldb_kv, ldb_kv_traverse_fn fn, void *ctx) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; MDB_val mdb_key; MDB_val mdb_data; MDB_txn *txn = NULL; MDB_dbi dbi = 0; MDB_cursor *cursor = NULL; int ret; txn = get_current_txn(lmdb); if (txn == NULL) { ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); lmdb->error = MDB_PANIC; return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_cursor_open(txn, dbi, &cursor); if (lmdb->error != MDB_SUCCESS) { goto done; } while ((lmdb->error = mdb_cursor_get( cursor, &mdb_key, &mdb_data, MDB_NEXT)) == MDB_SUCCESS) { struct ldb_val key = { .length = mdb_key.mv_size, .data = mdb_key.mv_data, }; struct ldb_val data = { .length = mdb_data.mv_size, .data = mdb_data.mv_data, }; ret = fn(ldb_kv, key, data, ctx); if (ret != 0) { /* * NOTE: This DOES NOT set lmdb->error! * * This means that the caller will get success. * This matches TDB traverse behaviour, where callbacks * may terminate the traverse, but do not change the * return code from success. * * Callers SHOULD store their own error codes. */ goto done; } } if (lmdb->error == MDB_NOTFOUND) { lmdb->error = MDB_SUCCESS; } done: if (cursor != NULL) { mdb_cursor_close(cursor); } if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } return ldb_mdb_err_map(lmdb->error); } static int lmdb_update_in_iterate(struct ldb_kv_private *ldb_kv, struct ldb_val key, struct ldb_val key2, struct ldb_val data, void *state) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; struct ldb_val copy; int ret = LDB_SUCCESS; /* * Need to take a copy of the data as the delete operation alters the * data, as it is in private lmdb memory. */ copy.length = data.length; copy.data = talloc_memdup(ldb_kv, data.data, data.length); if (copy.data == NULL) { lmdb->error = MDB_PANIC; return ldb_oom(lmdb->ldb); } lmdb->error = lmdb_delete(ldb_kv, key); if (lmdb->error != MDB_SUCCESS) { ldb_debug( lmdb->ldb, LDB_DEBUG_ERROR, "Failed to delete %*.*s " "for rekey as %*.*s: %s", (int)key.length, (int)key.length, (const char *)key.data, (int)key2.length, (int)key2.length, (const char *)key.data, mdb_strerror(lmdb->error)); ret = ldb_mdb_error(lmdb->ldb, lmdb->error); goto done; } lmdb->error = lmdb_store(ldb_kv, key2, copy, 0); if (lmdb->error != MDB_SUCCESS) { ldb_debug( lmdb->ldb, LDB_DEBUG_ERROR, "Failed to rekey %*.*s as %*.*s: %s", (int)key.length, (int)key.length, (const char *)key.data, (int)key2.length, (int)key2.length, (const char *)key.data, mdb_strerror(lmdb->error)); ret = ldb_mdb_error(lmdb->ldb, lmdb->error); goto done; } done: if (copy.data != NULL) { TALLOC_FREE(copy.data); copy.length = 0; } /* * Explicitly invalidate the data, as the delete has done this */ data.length = 0; data.data = NULL; return ret; } /* Handles only a single record */ static int lmdb_parse_record(struct ldb_kv_private *ldb_kv, struct ldb_val key, int (*parser)(struct ldb_val key, struct ldb_val data, void *private_data), void *ctx) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; MDB_val mdb_key; MDB_val mdb_data; MDB_txn *txn = NULL; MDB_dbi dbi; struct ldb_val data; txn = get_current_txn(lmdb); if (txn == NULL) { ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active"); lmdb->error = MDB_PANIC; return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } mdb_key.mv_size = key.length; mdb_key.mv_data = key.data; lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data); if (lmdb->error != MDB_SUCCESS) { /* TODO closing a handle should not even be necessary */ mdb_dbi_close(lmdb->env, dbi); if (lmdb->error == MDB_NOTFOUND) { return LDB_ERR_NO_SUCH_OBJECT; } if (lmdb->error == MDB_CORRUPTED) { ldb_debug(lmdb->ldb, LDB_DEBUG_ERROR, __location__ ": MDB corrupted for key [%*.*s]\n", (int)key.length, (int)key.length, key.data); } return ldb_mdb_error(lmdb->ldb, lmdb->error); } data.data = mdb_data.mv_data; data.length = mdb_data.mv_size; /* TODO closing a handle should not even be necessary */ mdb_dbi_close(lmdb->env, dbi); return parser(key, data, ctx); } /* * Exactly the same as iterate, except we have a start key and an end key * (which are both included in the results if present). * * If start > end, return MDB_PANIC. */ static int lmdb_iterate_range(struct ldb_kv_private *ldb_kv, struct ldb_val start_key, struct ldb_val end_key, ldb_kv_traverse_fn fn, void *ctx) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; MDB_val mdb_key; MDB_val mdb_data; MDB_txn *txn = NULL; MDB_dbi dbi = 0; MDB_cursor *cursor = NULL; int ret; MDB_val mdb_s_key; MDB_val mdb_e_key; txn = get_current_txn(lmdb); if (txn == NULL) { ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction"); lmdb->error = MDB_PANIC; return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } mdb_s_key.mv_size = start_key.length; mdb_s_key.mv_data = start_key.data; mdb_e_key.mv_size = end_key.length; mdb_e_key.mv_data = end_key.data; if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) { lmdb->error = MDB_PANIC; return ldb_mdb_error(lmdb->ldb, lmdb->error); } lmdb->error = mdb_cursor_open(txn, dbi, &cursor); if (lmdb->error != MDB_SUCCESS) { goto done; } lmdb->error = mdb_cursor_get(cursor, &mdb_s_key, &mdb_data, MDB_SET_RANGE); if (lmdb->error != MDB_SUCCESS) { if (lmdb->error == MDB_NOTFOUND) { lmdb->error = MDB_SUCCESS; } goto done; } else { struct ldb_val key = { .length = mdb_s_key.mv_size, .data = mdb_s_key.mv_data, }; struct ldb_val data = { .length = mdb_data.mv_size, .data = mdb_data.mv_data, }; if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) { goto done; } ret = fn(ldb_kv, key, data, ctx); if (ret != 0) { /* * NOTE: This DOES NOT set lmdb->error! * * This means that the caller will get success. * This matches TDB traverse behaviour, where callbacks * may terminate the traverse, but do not change the * return code from success. * * Callers SHOULD store their own error codes. */ goto done; } } while ((lmdb->error = mdb_cursor_get( cursor, &mdb_key, &mdb_data, MDB_NEXT)) == MDB_SUCCESS) { struct ldb_val key = { .length = mdb_key.mv_size, .data = mdb_key.mv_data, }; struct ldb_val data = { .length = mdb_data.mv_size, .data = mdb_data.mv_data, }; if (mdb_cmp(txn, dbi, &mdb_key, &mdb_e_key) > 0) { goto done; } ret = fn(ldb_kv, key, data, ctx); if (ret != 0) { /* * NOTE: This DOES NOT set lmdb->error! * * This means that the caller will get success. * This matches TDB traverse behaviour, where callbacks * may terminate the traverse, but do not change the * return code from success. * * Callers SHOULD store their own error codes. */ goto done; } } if (lmdb->error == MDB_NOTFOUND) { lmdb->error = MDB_SUCCESS; } done: if (cursor != NULL) { mdb_cursor_close(cursor); } if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } return ldb_mdb_err_map(lmdb->error); } static int lmdb_lock_read(struct ldb_module *module) { void *data = ldb_module_get_private(module); struct ldb_kv_private *ldb_kv = talloc_get_type(data, struct ldb_kv_private); struct lmdb_private *lmdb = ldb_kv->lmdb_private; pid_t pid = getpid(); if (pid != lmdb->pid) { ldb_asprintf_errstring( lmdb->ldb, __location__": Reusing ldb opened by pid %d in " "process %d\n", lmdb->pid, pid); lmdb->error = MDB_BAD_TXN; return LDB_ERR_PROTOCOL_ERROR; } lmdb->error = MDB_SUCCESS; if (lmdb_transaction_active(ldb_kv) == false && ldb_kv->read_lock_count == 0) { lmdb->error = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &lmdb->read_txn); } if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } ldb_kv->read_lock_count++; return ldb_mdb_err_map(lmdb->error); } static int lmdb_unlock_read(struct ldb_module *module) { void *data = ldb_module_get_private(module); struct ldb_kv_private *ldb_kv = talloc_get_type(data, struct ldb_kv_private); if (lmdb_transaction_active(ldb_kv) == false && ldb_kv->read_lock_count == 1) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; mdb_txn_commit(lmdb->read_txn); lmdb->read_txn = NULL; ldb_kv->read_lock_count--; return LDB_SUCCESS; } ldb_kv->read_lock_count--; return LDB_SUCCESS; } static int lmdb_transaction_start(struct ldb_kv_private *ldb_kv) { struct lmdb_private *lmdb = ldb_kv->lmdb_private; struct lmdb_trans *ltx; struct lmdb_trans *ltx_head; MDB_txn *tx_parent; pid_t pid = getpid(); /* Do not take out the transaction lock on a read-only DB */ if (ldb_kv->read_only) { return LDB_ERR_UNWILLING_TO_PERFORM; } ltx = talloc_zero(lmdb, struct lmdb_trans); if (ltx == NULL) { return ldb_oom(lmdb->ldb); } if (pid != lmdb->pid) { ldb_asprintf_errstring( lmdb->ldb, __location__": Reusing ldb opened by pid %d in " "process %d\n", lmdb->pid, pid); lmdb->error = MDB_BAD_TXN; return LDB_ERR_PROTOCOL_ERROR; } /* * Clear out any stale readers */ { int stale = 0; mdb_reader_check(lmdb->env, &stale); if (stale > 0) { ldb_debug( lmdb->ldb, LDB_DEBUG_ERROR, "LMDB Stale readers, deleted (%d)", stale); } } ltx_head = lmdb_private_trans_head(lmdb); tx_parent = lmdb_trans_get_tx(ltx_head); lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, <x->tx); if (lmdb->error != MDB_SUCCESS) { return ldb_mdb_error(lmdb->ldb, lmdb->error); } trans_push(lmdb, ltx); return ldb_mdb_err_map(lmdb->error); } static int lmdb_transaction_cancel(struct ldb_kv_private *ldb_kv) { struct lmdb_trans *ltx; struct lmdb_private *lmdb = ldb_kv->lmdb_private; ltx = lmdb_private_trans_head(lmdb); if (ltx == NULL) { return LDB_ERR_OPERATIONS_ERROR; } mdb_txn_abort(ltx->tx); trans_finished(lmdb, ltx); return LDB_SUCCESS; } static int lmdb_transaction_prepare_commit(struct ldb_kv_private *ldb_kv) { /* No need to prepare a commit */ return LDB_SUCCESS; } static int lmdb_transaction_commit(struct ldb_kv_private *ldb_kv) { struct lmdb_trans *ltx; struct lmdb_private *lmdb = ldb_kv->lmdb_private; ltx = lmdb_private_trans_head(lmdb); if (ltx == NULL) { return LDB_ERR_OPERATIONS_ERROR; } lmdb->error = mdb_txn_commit(ltx->tx); trans_finished(lmdb, ltx); return lmdb->error; } static int lmdb_error(struct ldb_kv_private *ldb_kv) { return ldb_mdb_err_map(ldb_kv->lmdb_private->error); } static const char *lmdb_errorstr(struct ldb_kv_private *ldb_kv) { return mdb_strerror(ldb_kv->lmdb_private->error); } static const char *lmdb_name(struct ldb_kv_private *ldb_kv) { return "lmdb"; } static bool lmdb_changed(struct ldb_kv_private *ldb_kv) { /* * lmdb does no provide a quick way to determine if the database * has changed. This function always returns true. * * Note that tdb uses a sequence number that allows this function * to be implemented efficiently. */ return true; } /* * Get the number of records in the database. * * The mdb_env_stat call returns an accurate count, so we return the actual * number of records in the database rather than an estimate. */ static size_t lmdb_get_size(struct ldb_kv_private *ldb_kv) { struct MDB_stat stats = {0}; struct lmdb_private *lmdb = ldb_kv->lmdb_private; int ret = 0; ret = mdb_env_stat(lmdb->env, &stats); if (ret != 0) { return 0; } return stats.ms_entries; } /* * Start a sub transaction * As lmdb supports nested transactions we can start a new transaction */ static int lmdb_nested_transaction_start(struct ldb_kv_private *ldb_kv) { int ret = lmdb_transaction_start(ldb_kv); return ret; } /* * Commit a sub transaction * As lmdb supports nested transactions we can commit the nested transaction */ static int lmdb_nested_transaction_commit(struct ldb_kv_private *ldb_kv) { int ret = lmdb_transaction_commit(ldb_kv); return ret; } /* * Cancel a sub transaction * As lmdb supports nested transactions we can cancel the nested transaction */ static int lmdb_nested_transaction_cancel(struct ldb_kv_private *ldb_kv) { int ret = lmdb_transaction_cancel(ldb_kv); return ret; } static struct kv_db_ops lmdb_key_value_ops = { .options = LDB_KV_OPTION_STABLE_READ_LOCK, .store = lmdb_store, .delete = lmdb_delete, .iterate = lmdb_traverse_fn, .update_in_iterate = lmdb_update_in_iterate, .fetch_and_parse = lmdb_parse_record, .iterate_range = lmdb_iterate_range, .lock_read = lmdb_lock_read, .unlock_read = lmdb_unlock_read, .begin_write = lmdb_transaction_start, .prepare_write = lmdb_transaction_prepare_commit, .finish_write = lmdb_transaction_commit, .abort_write = lmdb_transaction_cancel, .error = lmdb_error, .errorstr = lmdb_errorstr, .name = lmdb_name, .has_changed = lmdb_changed, .transaction_active = lmdb_transaction_active, .get_size = lmdb_get_size, .begin_nested_write = lmdb_nested_transaction_start, .finish_nested_write = lmdb_nested_transaction_commit, .abort_nested_write = lmdb_nested_transaction_cancel, }; static const char *lmdb_get_path(const char *url) { const char *path; /* parse the url */ if (strchr(url, ':')) { if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) { return NULL; } path = url + MDB_URL_PREFIX_SIZE; } else { path = url; } return path; } static int lmdb_pvt_destructor(struct lmdb_private *lmdb) { struct lmdb_trans *ltx = NULL; /* Check if this is a forked child */ if (getpid() != lmdb->pid) { int fd = 0; /* * We cannot call mdb_env_close or commit any transactions, * otherwise they might appear finished in the parent. * */ if (mdb_env_get_fd(lmdb->env, &fd) == 0) { close(fd); } /* Remove the pointer, so that no access should occur */ lmdb->env = NULL; return 0; } /* * Close the read transaction if it's open */ if (lmdb->read_txn != NULL) { mdb_txn_abort(lmdb->read_txn); } if (lmdb->env == NULL) { return 0; } /* * Abort any currently active transactions */ ltx = lmdb_private_trans_head(lmdb); while (ltx != NULL) { mdb_txn_abort(ltx->tx); trans_finished(lmdb, ltx); ltx = lmdb_private_trans_head(lmdb); } lmdb->env = NULL; return 0; } struct mdb_env_wrap { struct mdb_env_wrap *next, *prev; dev_t device; ino_t inode; MDB_env *env; pid_t pid; }; static struct mdb_env_wrap *mdb_list; /* destroy the last connection to an mdb */ static int mdb_env_wrap_destructor(struct mdb_env_wrap *w) { mdb_env_close(w->env); DLIST_REMOVE(mdb_list, w); return 0; } static int lmdb_open_env(TALLOC_CTX *mem_ctx, MDB_env **env, struct ldb_context *ldb, const char *path, const size_t env_map_size, unsigned int flags) { int ret; unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS; /* * MDB_NOSUBDIR implies there is a separate file called path and a * separate lockfile called path-lock */ struct mdb_env_wrap *w; struct stat st; pid_t pid = getpid(); int fd = 0; unsigned v; if (stat(path, &st) == 0) { for (w=mdb_list;w;w=w->next) { if (st.st_dev == w->device && st.st_ino == w->inode && pid == w->pid) { /* * We must have only one MDB_env per process */ if (!talloc_reference(mem_ctx, w)) { return ldb_oom(ldb); } *env = w->env; return LDB_SUCCESS; } } } w = talloc(mem_ctx, struct mdb_env_wrap); if (w == NULL) { return ldb_oom(ldb); } ret = mdb_env_create(env); if (ret != 0) { ldb_asprintf_errstring( ldb, "Could not create MDB environment %s: %s\n", path, mdb_strerror(ret)); return ldb_mdb_err_map(ret); } if (env_map_size > 0) { ret = mdb_env_set_mapsize(*env, env_map_size); if (ret != 0) { ldb_asprintf_errstring( ldb, "Could not set MDB mmap() size to %llu " "on %s: %s\n", (unsigned long long)(env_map_size), path, mdb_strerror(ret)); TALLOC_FREE(w); return ldb_mdb_err_map(ret); } } mdb_env_set_maxreaders(*env, 100000); /* * As we ensure that there is only one MDB_env open per database per * process. We can not use the MDB_RDONLY flag, as another ldb may be * opened in read write mode */ if (flags & LDB_FLG_NOSYNC) { mdb_flags |= MDB_NOSYNC; } ret = mdb_env_open(*env, path, mdb_flags, 0644); if (ret != 0) { ldb_asprintf_errstring(ldb, "Could not open DB %s: %s\n", path, mdb_strerror(ret)); TALLOC_FREE(w); return ldb_mdb_err_map(ret); } { MDB_envinfo stat = {0}; ret = mdb_env_info (*env, &stat); if (ret != 0) { ldb_asprintf_errstring( ldb, "Could not get MDB environment stats %s: %s\n", path, mdb_strerror(ret)); return ldb_mdb_err_map(ret); } } ret = mdb_env_get_fd(*env, &fd); if (ret != 0) { ldb_asprintf_errstring(ldb, "Could not obtain DB FD %s: %s\n", path, mdb_strerror(ret)); TALLOC_FREE(w); return ldb_mdb_err_map(ret); } /* Just as for TDB: on exec, don't inherit the fd */ v = fcntl(fd, F_GETFD, 0); if (v == -1) { TALLOC_FREE(w); return LDB_ERR_OPERATIONS_ERROR; } ret = fcntl(fd, F_SETFD, v | FD_CLOEXEC); if (ret == -1) { TALLOC_FREE(w); return LDB_ERR_OPERATIONS_ERROR; } if (fstat(fd, &st) != 0) { ldb_asprintf_errstring( ldb, "Could not stat %s:\n", path); TALLOC_FREE(w); return LDB_ERR_OPERATIONS_ERROR; } w->env = *env; w->device = st.st_dev; w->inode = st.st_ino; w->pid = pid; talloc_set_destructor(w, mdb_env_wrap_destructor); DLIST_ADD(mdb_list, w); return LDB_SUCCESS; } static int lmdb_pvt_open(struct lmdb_private *lmdb, struct ldb_context *ldb, const char *path, const size_t env_map_size, unsigned int flags) { int ret; int lmdb_max_key_length; if (flags & LDB_FLG_DONT_CREATE_DB) { struct stat st; if (stat(path, &st) != 0) { return LDB_ERR_UNAVAILABLE; } } ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, env_map_size, flags); if (ret != 0) { return ret; } /* Close when lmdb is released */ talloc_set_destructor(lmdb, lmdb_pvt_destructor); /* Store the original pid during the LMDB open */ lmdb->pid = getpid(); lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env); /* This will never happen, but if it does make sure to freak out */ if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) { return ldb_operr(ldb); } return LDB_SUCCESS; } int lmdb_connect(struct ldb_context *ldb, const char *url, unsigned int flags, const char *options[], struct ldb_module **_module) { const char *path = NULL; struct lmdb_private *lmdb = NULL; struct ldb_kv_private *ldb_kv = NULL; int ret; size_t env_map_size = 0; /* * We hold locks, so we must use a private event context * on each returned handle */ ldb_set_require_private_event_context(ldb); path = lmdb_get_path(url); if (path == NULL) { ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url); return LDB_ERR_OPERATIONS_ERROR; } ldb_kv = talloc_zero(ldb, struct ldb_kv_private); if (!ldb_kv) { ldb_oom(ldb); return LDB_ERR_OPERATIONS_ERROR; } lmdb = talloc_zero(ldb_kv, struct lmdb_private); if (lmdb == NULL) { TALLOC_FREE(ldb_kv); return ldb_oom(ldb); } lmdb->ldb = ldb; ldb_kv->kv_ops = &lmdb_key_value_ops; { const char *size = ldb_options_find( ldb, ldb->options, "lmdb_env_size"); if (size != NULL) { env_map_size = strtoull(size, NULL, 0); } } ret = lmdb_pvt_open(lmdb, ldb, path, env_map_size, flags); if (ret != LDB_SUCCESS) { TALLOC_FREE(ldb_kv); return ret; } ldb_kv->lmdb_private = lmdb; if (flags & LDB_FLG_RDONLY) { ldb_kv->read_only = true; } /* * This maximum length becomes encoded in the index values so * must never change even if LMDB starts to allow longer keys. * The override option is max_key_len_for_self_test, and is * used for testing only. */ ldb_kv->max_key_length = LDB_MDB_MAX_KEY_LENGTH; return ldb_kv_init_store( ldb_kv, "ldb_mdb backend", ldb, options, _module); }