/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: #ident "$Id$" /*====== This file is part of PerconaFT. Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. PerconaFT is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2, as published by the Free Software Foundation. PerconaFT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with PerconaFT. If not, see . ---------------------------------------- PerconaFT is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License, version 3, as published by the Free Software Foundation. PerconaFT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with PerconaFT. If not, see . ======= */ #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." #include "test.h" // verify that put_multiple inserts the correct rows into N dictionaries // verify that pu_multiple locks the correct keys for N dictionaries const int max_rows_per_primary = 9; static uint32_t get_total_secondary_rows(uint32_t num_primary) { assert((num_primary % (max_rows_per_primary+1)) == 0); return num_primary / (max_rows_per_primary+1) * ( (max_rows_per_primary) * (max_rows_per_primary+1) / 2 ); } static uint8_t get_num_keys(uint16_t i, uint8_t dbnum) { return (i+dbnum) % (max_rows_per_primary + 1); // 0..9.. 10 choices } static uint16_t get_total_num_keys(uint16_t i, uint8_t num_dbs) { uint16_t sum = 0; for (uint8_t db = 0; db < num_dbs; ++db) { sum += get_num_keys(i, db); } return sum; } static uint32_t get_key(uint16_t i, uint8_t dbnum, uint8_t which) { uint32_t i32 = i; uint32_t dbnum32 = dbnum; uint32_t which32 = which; uint32_t x = (dbnum32<<24) | (i32) | (which32<<8); return x; } static void get_data(uint32_t *v, uint8_t i, uint8_t ndbs) { int index = 0; for (uint8_t dbnum = 0; dbnum < ndbs; dbnum++) { for (uint8_t which = 0; which < get_num_keys(i, dbnum); ++which) { v[index++] = get_key(i, dbnum, which); } } } static int put_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) { (void) src_val; uint8_t dbnum; assert(dest_db->descriptor->dbt.size == sizeof dbnum); memcpy(&dbnum, dest_db->descriptor->dbt.data, sizeof dbnum); assert(dbnum > 0); // Does not get called for primary. assert(dest_db != src_db); assert(src_key->size == 2); uint16_t i = *(uint16_t*)src_key->data; uint8_t num_keys = get_num_keys(i, dbnum); toku_dbt_array_resize(dest_keys, num_keys); if (dest_vals) { toku_dbt_array_resize(dest_vals, num_keys); } for (uint8_t which = 0; which < num_keys; ++which) { DBT *dest_key = &dest_keys->dbts[which]; assert(dest_key->flags == DB_DBT_REALLOC); { // Memory management if (dest_key->ulen < sizeof(uint32_t)) { dest_key->data = toku_xrealloc(dest_key->data, sizeof(uint32_t)); dest_key->ulen = sizeof(uint32_t); } dest_key->size = sizeof(uint32_t); } *(uint32_t*)dest_key->data = get_key(i, dbnum, which); if (dest_vals) { DBT *dest_val = &dest_vals->dbts[which]; dest_val->flags = 0; dest_val->data = nullptr; dest_val->size = 0; } } return 0; } static int del_callback(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, const DBT *src_key, const DBT *src_data) { return put_callback(dest_db, src_db, dest_keys, NULL, src_key, src_data); } static void verify_locked(DB_ENV *env, DB *db, uint8_t dbnum, uint16_t i) { int r; DB_TXN *txn = NULL; r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r); if (dbnum == 0) { DBT key; dbt_init(&key, &i, sizeof i); r = db->del(db, txn, &key, DB_DELETE_ANY); CKERR2(r, DB_LOCK_NOTGRANTED); } else { for (uint8_t which = 0; which < get_num_keys(i, dbnum); ++which) { uint32_t k = get_key(i, dbnum, which); DBT key; dbt_init(&key, &k, sizeof k); r = db->del(db, txn, &key, DB_DELETE_ANY); CKERR2(r, DB_LOCK_NOTGRANTED); } } r = txn->abort(txn); assert_zero(r); } static void verify_seq_primary(DB_ENV *env, DB *db, int dbnum, int ndbs, int nrows) { assert(dbnum==0); int r; DB_TXN *txn = NULL; r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r); DBC *cursor = NULL; r = db->cursor(db, txn, &cursor, 0); assert_zero(r); int i; for (i = 0; ; i++) { DBT key; memset(&key, 0, sizeof key); DBT val; memset(&val, 0, sizeof val); r = cursor->c_get(cursor, &key, &val, DB_NEXT); if (r != 0) break; uint16_t k; assert(key.size == sizeof k); memcpy(&k, key.data, key.size); assert(k == i); uint32_t total_rows = get_total_num_keys(i, ndbs); assert(val.size == total_rows * sizeof (uint32_t)); uint32_t v[total_rows]; get_data(v, i, ndbs); assert(memcmp(val.data, v, val.size) == 0); } assert(i == nrows); r = cursor->c_close(cursor); assert_zero(r); r = txn->commit(txn, 0); assert_zero(r); } static void verify_seq(DB_ENV *env, DB *db, uint8_t dbnum, uint8_t ndbs, uint16_t nrows_primary) { assert(dbnum > 0); assert(dbnum < ndbs); uint32_t nrows = get_total_secondary_rows(nrows_primary); int r; DB_TXN *txn = NULL; r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r); DBC *cursor = NULL; r = db->cursor(db, txn, &cursor, 0); assert_zero(r); uint16_t rows_found = 0; uint16_t source_i = 0; DBT key; memset(&key, 0, sizeof key); DBT val; memset(&val, 0, sizeof val); for (source_i = 0; source_i < nrows_primary; ++source_i) { uint8_t num_keys = get_num_keys(source_i, dbnum); for (uint8_t which = 0; which < num_keys; ++which) { r = cursor->c_get(cursor, &key, &val, DB_NEXT); CKERR(r); uint32_t k; assert(key.size == sizeof k); memcpy(&k, key.data, key.size); assert(k == get_key(source_i, dbnum, which)); assert(val.size == 0); rows_found++; } } r = cursor->c_get(cursor, &key, &val, DB_NEXT); CKERR2(r, DB_NOTFOUND); assert(rows_found == nrows); r = cursor->c_close(cursor); assert_zero(r); r = txn->commit(txn, 0); assert_zero(r); } static void verify(DB_ENV *env, DB *db[], int ndbs, int nrows) { verify_seq_primary(env, db[0], 0, ndbs, nrows); for (int dbnum = 1; dbnum < ndbs; dbnum++) verify_seq(env, db[dbnum], dbnum, ndbs, nrows); } static void verify_empty(DB_ENV *env, DB *db) { int r; DB_TXN *txn = NULL; r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r); DBC *cursor = NULL; r = db->cursor(db, txn, &cursor, 0); assert_zero(r); int i; for (i = 0; ; i++) { DBT key; memset(&key, 0, sizeof key); DBT val; memset(&val, 0, sizeof val); r = cursor->c_get(cursor, &key, &val, DB_NEXT); if (r != 0) break; } assert_zero(i); r = cursor->c_close(cursor); assert_zero(r); r = txn->commit(txn, 0); assert_zero(r); } static void verify_del(DB_ENV *env, DB *db[], int ndbs) { for (int dbnum = 0; dbnum < ndbs; dbnum++) verify_empty(env, db[dbnum]); } static void populate(DB_ENV *env, DB *db[], uint8_t ndbs, uint16_t nrows, bool del) { int r; DB_TXN *txn = NULL; r = env->txn_begin(env, NULL, &txn, 0); assert_zero(r); DBT_ARRAY key_arrays[ndbs]; DBT_ARRAY val_arrays[ndbs]; for (uint8_t i = 0; i < ndbs; ++i) { toku_dbt_array_init(&key_arrays[i], 1); toku_dbt_array_init(&val_arrays[i], 1); } // populate for (uint16_t i = 0; i < nrows; i++) { uint32_t total_rows = get_total_num_keys(i, ndbs); uint16_t k = i; uint32_t v[total_rows]; get_data(v, i, ndbs); DBT pri_key; dbt_init(&pri_key, &k, sizeof k); DBT pri_val; dbt_init(&pri_val, &v[0], sizeof v); uint32_t flags[ndbs]; memset(flags, 0, sizeof flags); if (del) { r = env->del_multiple(env, db[0], txn, &pri_key, &pri_val, ndbs, db, key_arrays, flags); } else { r = env->put_multiple(env, db[0], txn, &pri_key, &pri_val, ndbs, db, key_arrays, val_arrays, flags); } assert_zero(r); for (int dbnum = 0; dbnum < ndbs; dbnum++) verify_locked(env, db[dbnum], dbnum, i); } for (uint8_t i = 0; i < ndbs; ++i) { toku_dbt_array_destroy(&key_arrays[i]); toku_dbt_array_destroy(&val_arrays[i]); } r = txn->commit(txn, 0); assert_zero(r); } static void run_test(int ndbs, int nrows) { int r; DB_ENV *env = NULL; r = db_env_create(&env, 0); assert_zero(r); r = env->set_generate_row_callback_for_put(env, put_callback); assert_zero(r); r = env->set_generate_row_callback_for_del(env, del_callback); assert_zero(r); r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); DB *db[ndbs]; for (uint8_t dbnum = 0; dbnum < ndbs; dbnum++) { r = db_create(&db[dbnum], env, 0); assert_zero(r); DBT dbt_dbnum; dbt_init(&dbt_dbnum, &dbnum, sizeof dbnum); char dbname[32]; sprintf(dbname, "%d.tdb", dbnum); r = db[dbnum]->open(db[dbnum], NULL, dbname, NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); IN_TXN_COMMIT(env, NULL, txn_desc, 0, { { int chk_r = db[dbnum]->change_descriptor(db[dbnum], txn_desc, &dbt_dbnum, 0); CKERR(chk_r); } }); } populate(env, db, ndbs, nrows, false); verify(env, db, ndbs, nrows); populate(env, db, ndbs, nrows, true); verify_del(env, db, ndbs); for (int dbnum = 0; dbnum < ndbs; dbnum++) r = db[dbnum]->close(db[dbnum], 0); assert_zero(r); r = env->close(env, 0); assert_zero(r); } int test_main(int argc, char * const argv[]) { int r; int ndbs = 16; int nrows = 100; // parse_args(argc, argv); for (int i = 1; i < argc; i++) { char * const arg = argv[i]; if (strcmp(arg, "-v") == 0) { verbose++; continue; } if (strcmp(arg, "-q") == 0) { verbose = 0; continue; } if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) { ndbs = atoi(argv[++i]); continue; } if (strcmp(arg, "--nrows") == 0 && i+1 < argc) { nrows = atoi(argv[++i]); continue; } } //rows should be divisible by max_rows + 1 (so that we have an equal number of each type and we know the total) if (nrows % (max_rows_per_primary+1) != 0) { nrows += (max_rows_per_primary+1) - (nrows % (max_rows_per_primary+1)); } assert(ndbs >= 0); assert(ndbs < (1<<8) - 1); assert(nrows >= 0); assert(nrows < (1<<15)); // Leave plenty of room toku_os_recursive_delete(TOKU_TEST_FILENAME); r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); run_test(ndbs, nrows); return 0; }