diff options
author | unknown <tomas@poseidon.ndb.mysql.com> | 2004-10-03 21:39:04 +0000 |
---|---|---|
committer | unknown <tomas@poseidon.ndb.mysql.com> | 2004-10-03 21:39:04 +0000 |
commit | 6b1895b86400e8531dd99b89421bc44984568ed8 (patch) | |
tree | aba1618cac773466e58a7ff45f66bb205fe52130 /sql/ha_ndbcluster.cc | |
parent | 3a15d0884d0b9a248d2f3b0f66a05c35b942badc (diff) | |
parent | 25bcfeef6093dfdfcd84323dc1ed4723ea71f7fc (diff) | |
download | mariadb-git-6b1895b86400e8531dd99b89421bc44984568ed8.tar.gz |
Merge
BitKeeper/etc/logging_ok:
auto-union
sql/ha_ndbcluster.h:
Auto merged
sql/handler.cc:
Auto merged
sql/handler.h:
Auto merged
sql/mysql_priv.h:
Auto merged
sql/sql_base.cc:
Auto merged
sql/sql_show.cc:
Auto merged
sql/sql_table.cc:
Auto merged
mysql-test/mysql-test-run.sh:
SCCS merged
sql/ha_ndbcluster.cc:
SCCS merged
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 512 |
1 files changed, 341 insertions, 171 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index e48ac8cc75e..d7a580dbb88 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -32,9 +32,6 @@ #include <ndbapi/NdbApi.hpp> #include <ndbapi/NdbScanFilter.hpp> -#define USE_DISCOVER_ON_STARTUP -//#define USE_NDB_POOL - // Default value for parallelism static const int parallelism= 240; @@ -48,11 +45,13 @@ static const ha_rows autoincrement_prefetch= 32; // connectstring to cluster if given by mysqld const char *ndbcluster_connectstring= 0; +static const char *ha_ndb_ext=".ndb"; + #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 #define ERR_PRINT(err) \ - DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message)) + DBUG_PRINT("error", ("%d message: %s", err.code, err.message)) #define ERR_RETURN(err) \ { \ @@ -107,7 +106,9 @@ static const err_code_mapping err_map[]= { 893, HA_ERR_FOUND_DUPP_UNIQUE }, { 721, HA_ERR_TABLE_EXIST }, { 4244, HA_ERR_TABLE_EXIST }, - { 241, HA_ERR_OLD_METADATA }, + + { 709, HA_ERR_NO_SUCH_TABLE }, + { 284, HA_ERR_NO_SUCH_TABLE }, { 266, HA_ERR_LOCK_WAIT_TIMEOUT }, { 274, HA_ERR_LOCK_WAIT_TIMEOUT }, @@ -147,7 +148,25 @@ int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans) int m_batch_execute= 0; if (false && m_batch_execute) return 0; - return trans->execute(NoCommit); + return trans->execute(NoCommit,AbortOnError,1); +} + +inline +int execute_commit(ha_ndbcluster *h, NdbConnection *trans) +{ + int m_batch_execute= 0; + if (false && m_batch_execute) + return 0; + return trans->execute(Commit,AbortOnError,1); +} + +inline +int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans) +{ + int m_batch_execute= 0; + if (false && m_batch_execute) + return 0; + return trans->execute(NoCommit,IgnoreError,1); } /* @@ -178,19 +197,9 @@ struct Ndb_table_local_info { ha_rows records; }; -void ha_ndbcluster::set_rec_per_key() -{ - DBUG_ENTER("ha_ndbcluster::get_status_const"); - for (uint i=0 ; i < table->keys ; i++) - { - table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1; - } - DBUG_VOID_RETURN; -} - void ha_ndbcluster::records_update() { - DBUG_ENTER("ha_ndbcluster::get_status_variable"); + DBUG_ENTER("ha_ndbcluster::records_update"); struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info; DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", ((const NDBTAB *)m_table)->getTableId(), @@ -278,6 +287,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans) NDBDICT *dict= m_ndb->getDictionary(); DBUG_PRINT("info", ("invalidateTable %s", m_tabname)); dict->invalidateTable(m_tabname); + table->version=0L; /* Free when thread is ready */ break; } default: @@ -318,7 +328,8 @@ bool ha_ndbcluster::get_error_message(int error, /* Check if type is supported by NDB. - TODO Use this once, not in every operation + TODO Use this once in open(), not in every operation + */ static inline bool ndb_supported_type(enum_field_types type) @@ -665,7 +676,7 @@ int ha_ndbcluster::get_metadata(const char *path) memcmp(pack_data, tab->getFrmData(), pack_length))); DBUG_DUMP("pack_data", (char*)pack_data, pack_length); DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength()); - error= HA_ERR_OLD_METADATA; + error= 3; invalidating_ndb_table= false; } } @@ -689,11 +700,11 @@ int ha_ndbcluster::get_metadata(const char *path) int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) { + uint i; int error= 0; - char *name; - const char *index_name; + const char *name, *index_name; + char unique_index_name[FN_LEN]; static const char* unique_suffix= "$unique"; - uint i, name_len; KEY* key_info= tab->key_info; const char **key_name= tab->keynames.type_names; NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); @@ -707,21 +718,15 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) m_index[i].type= idx_type; if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) { - name_len= strlen(index_name)+strlen(unique_suffix)+1; - // Create name for unique index by appending "$unique"; - if (!(name= my_malloc(name_len, MYF(MY_WME)))) - DBUG_RETURN(2); - strxnmov(name, name_len, index_name, unique_suffix, NullS); - m_index[i].unique_name= name; - DBUG_PRINT("info", ("Created unique index name: %s for index %d", - name, i)); + strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS); + DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d", + unique_index_name, i)); } // Create secondary indexes if in create phase if (phase == ILBP_CREATE) { - DBUG_PRINT("info", ("Creating index %u: %s", i, index_name)); - - switch (m_index[i].type){ + DBUG_PRINT("info", ("Creating index %u: %s", i, index_name)); + switch (idx_type){ case PRIMARY_KEY_INDEX: // Do nothing, already created @@ -731,10 +736,10 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) break; case UNIQUE_ORDERED_INDEX: if (!(error= create_ordered_index(index_name, key_info))) - error= create_unique_index(get_unique_index_name(i), key_info); + error= create_unique_index(unique_index_name, key_info); break; case UNIQUE_INDEX: - error= create_unique_index(get_unique_index_name(i), key_info); + error= create_unique_index(unique_index_name, key_info); break; case ORDERED_INDEX: error= create_ordered_index(index_name, key_info); @@ -751,21 +756,20 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) } } // Add handles to index objects - DBUG_PRINT("info", ("Trying to add handle to index %s", index_name)); - if ((m_index[i].type != PRIMARY_KEY_INDEX) && - (m_index[i].type != UNIQUE_INDEX)) + if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX) { + DBUG_PRINT("info", ("Get handle to index %s", index_name)); const NDBINDEX *index= dict->getIndex(index_name, m_tabname); if (!index) DBUG_RETURN(1); m_index[i].index= (void *) index; } - if (m_index[i].unique_name) + if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) { - const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname); + DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name)); + const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname); if (!index) DBUG_RETURN(1); m_index[i].unique_index= (void *) index; } - DBUG_PRINT("info", ("Added handle to index %s", index_name)); } DBUG_RETURN(error); @@ -801,9 +805,6 @@ void ha_ndbcluster::release_metadata() // Release index list for (i= 0; i < MAX_KEY; i++) { - if (m_index[i].unique_name) - my_free((char*)m_index[i].unique_name, MYF(0)); - m_index[i].unique_name= NULL; m_index[i].unique_index= NULL; m_index[i].index= NULL; } @@ -813,17 +814,15 @@ void ha_ndbcluster::release_metadata() int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { - int lm; if (type == TL_WRITE_ALLOW_WRITE) - lm= NdbOperation::LM_Exclusive; + return NdbOperation::LM_Exclusive; else if (uses_blob_value(retrieve_all_fields)) /* TODO use a new scan mode to read + lock + keyinfo */ - lm= NdbOperation::LM_Exclusive; + return NdbOperation::LM_Exclusive; else - lm= NdbOperation::LM_CommittedRead; - return lm; + return NdbOperation::LM_CommittedRead; } static const ulong index_type_flags[]= @@ -861,16 +860,6 @@ static const ulong index_type_flags[]= static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong); -inline const char* ha_ndbcluster::get_index_name(uint idx_no) const -{ - return table->keynames.type_names[idx_no]; -} - -inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const -{ - return m_index[idx_no].unique_name; -} - inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const { DBUG_ASSERT(idx_no < MAX_KEY); @@ -1006,7 +995,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) } } - if (trans->execute(NoCommit, IgnoreError) != 0) + if (execute_no_commit_ie(this,trans) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -1087,7 +1076,6 @@ int ha_ndbcluster::unique_index_read(const byte *key, DBUG_ENTER("unique_index_read"); DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); DBUG_DUMP("key", (char*)key, key_len); - DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index))); NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); @@ -1127,7 +1115,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, } } - if (trans->execute(NoCommit, IgnoreError) != 0) + if (execute_no_commit_ie(this,trans) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); @@ -1204,7 +1192,7 @@ inline int ha_ndbcluster::next_result(byte *buf) } else { - if (ops_pending && (trans->execute(Commit) != 0)) + if (ops_pending && (execute_commit(this,trans) != 0)) DBUG_RETURN(ndb_err(trans)); trans->restart(); } @@ -1343,7 +1331,6 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, NdbConnection *trans= m_active_trans; NdbResultSet *cursor; NdbIndexScanOperation *op; - const char *index_name; DBUG_ENTER("ordered_index_scan"); DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); @@ -1352,7 +1339,6 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_EXECUTE("enter", print_key(start_key, "start_key");); DBUG_EXECUTE("enter", print_key(end_key, "end_key");); - index_name= get_index_name(active_index); NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); @@ -1561,8 +1547,8 @@ int ha_ndbcluster::write_row(byte *record) } statistic_increment(ha_write_count,&LOCK_status); - if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) - table->timestamp_field->set_time(); + if (table->timestamp_default_now) + update_timestamp(record+table->timestamp_default_now-1); has_auto_increment= (table->next_number_field && record == table->record[0]); if (!(op= trans->getNdbOperation((const NDBTAB *) m_table))) @@ -1639,7 +1625,7 @@ int ha_ndbcluster::write_row(byte *record) } else { - if (trans->execute(Commit) != 0) + if (execute_commit(this,trans) != 0) { skip_auto_increment= true; no_uncommitted_rows_execute_failure(); @@ -1712,9 +1698,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) DBUG_ENTER("update_row"); statistic_increment(ha_update_count,&LOCK_status); - if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) - table->timestamp_field->set_time(); - + if (table->timestamp_on_update_now) + update_timestamp(new_data+table->timestamp_on_update_now-1); + /* Check for update of primary key for special handling */ if ((table->primary_key != MAX_KEY) && (key_cmp(table->primary_key, old_data, new_data))) @@ -2419,6 +2405,8 @@ void ha_ndbcluster::info(uint flag) DBUG_PRINT("info", ("HA_STATUS_NO_LOCK")); if (flag & HA_STATUS_TIME) DBUG_PRINT("info", ("HA_STATUS_TIME")); + if (flag & HA_STATUS_CONST) + DBUG_PRINT("info", ("HA_STATUS_CONST")); if (flag & HA_STATUS_VARIABLE) { DBUG_PRINT("info", ("HA_STATUS_VARIABLE")); @@ -2434,11 +2422,6 @@ void ha_ndbcluster::info(uint flag) } } } - if (flag & HA_STATUS_CONST) - { - DBUG_PRINT("info", ("HA_STATUS_CONST")); - set_rec_per_key(); - } if (flag & HA_STATUS_ERRKEY) { DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); @@ -2660,7 +2643,7 @@ int ha_ndbcluster::reset() const char **ha_ndbcluster::bas_ext() const -{ static const char *ext[1]= { NullS }; return ext; } +{ static const char *ext[]= { ".ndb", NullS }; return ext; } /* @@ -2684,14 +2667,13 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, enum thr_lock_type lock_type) { DBUG_ENTER("store_lock"); - if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK) { - + /* If we are not doing a LOCK TABLE, then allow multiple writers */ - if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + if ((lock_type >= TL_WRITE_ALLOW_WRITE && lock_type <= TL_WRITE) && !thd->in_lock_tables) lock_type= TL_WRITE_ALLOW_WRITE; @@ -2858,18 +2840,26 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) } m_table= NULL; m_table_info= NULL; - if (m_active_trans) - DBUG_PRINT("warning", ("m_active_trans != NULL")); + /* + This is the place to make sure this handler instance + no longer are connected to the active transaction. + + And since the handler is no longer part of the transaction + it can't have open cursors, ops or blobs pending. + */ + m_active_trans= NULL; + if (m_active_cursor) DBUG_PRINT("warning", ("m_active_cursor != NULL")); + m_active_cursor= NULL; + if (blobs_pending) DBUG_PRINT("warning", ("blobs_pending != 0")); + blobs_pending= 0; + if (ops_pending) DBUG_PRINT("warning", ("ops_pending != 0L")); - m_active_trans= NULL; - m_active_cursor= NULL; ops_pending= 0; - blobs_pending= 0; } DBUG_RETURN(error); } @@ -2929,7 +2919,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction) "stmt" : "all")); DBUG_ASSERT(ndb && trans); - if (trans->execute(Commit) != 0) + if (execute_commit(0,trans) != 0) { const NdbError err= trans->getNdbError(); const NdbOperation *error_op= trans->getNdbErrorOperation(); @@ -3180,12 +3170,24 @@ int ha_ndbcluster::create(const char *name, const void *data, *pack_data; const char **key_names= form->keynames.type_names; char name2[FN_HEADLEN]; + bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE); DBUG_ENTER("create"); DBUG_PRINT("enter", ("name: %s", name)); fn_format(name2, name, "", "",2); // Remove the .frm extension set_dbname(name2); - set_tabname(name2); + set_tabname(name2); + + if (create_from_engine) + { + /* + Table alreay exists in NDB and frm file has been created by + caller. + Do Ndb specific stuff, such as create a .ndb file + */ + my_errno= write_ndb_file(); + DBUG_RETURN(my_errno); + } DBUG_PRINT("table", ("name: %s", m_tabname)); tab.setName(m_tabname); @@ -3226,16 +3228,12 @@ int ha_ndbcluster::create(const char *name, tab.addColumn(col); } - my_errno= 0; - if (check_ndb_connection()) - { - my_errno= HA_ERR_NO_CONNECTION; + if ((my_errno= check_ndb_connection())) DBUG_RETURN(my_errno); - } // Create the table in NDB NDBDICT *dict= m_ndb->getDictionary(); - if (dict->createTable(tab)) + if (dict->createTable(tab) != 0) { const NdbError err= dict->getNdbError(); ERR_PRINT(err); @@ -3248,6 +3246,9 @@ int ha_ndbcluster::create(const char *name, // Create secondary indexes my_errno= build_index_list(form, ILBP_CREATE); + if (!my_errno) + my_errno= write_ndb_file(); + DBUG_RETURN(my_errno); } @@ -3283,7 +3284,6 @@ int ha_ndbcluster::create_index(const char *name, DBUG_ENTER("create_index"); DBUG_PRINT("enter", ("name: %s ", name)); - // NdbDictionary::Index ndb_index(name); NdbDictionary::Index ndb_index(name); if (unique) ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); @@ -3324,14 +3324,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) set_tabname(from); set_tabname(to, new_tabname); - if (check_ndb_connection()) { - my_errno= HA_ERR_NO_CONNECTION; - DBUG_RETURN(my_errno); - } + if (check_ndb_connection()) + DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION); + int result= alter_table_name(m_tabname, new_tabname); if (result == 0) + { set_tabname(to); + handler::rename_table(from, to); + } DBUG_RETURN(result); } @@ -3376,6 +3378,8 @@ int ha_ndbcluster::delete_table(const char *name) if (check_ndb_connection()) DBUG_RETURN(HA_ERR_NO_CONNECTION); + + handler::delete_table(name); DBUG_RETURN(drop_table()); } @@ -3477,7 +3481,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): for (i= 0; i < MAX_KEY; i++) { m_index[i].type= UNDEFINED_INDEX; - m_index[i].unique_name= NULL; m_index[i].unique_index= NULL; m_index[i].index= NULL; } @@ -3520,7 +3523,6 @@ ha_ndbcluster::~ha_ndbcluster() int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) { - int res; KEY *key; DBUG_ENTER("open"); DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d", @@ -3547,11 +3549,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) free_share(m_share); m_share= 0; DBUG_RETURN(HA_ERR_NO_CONNECTION); } - res= get_metadata(name); - if (!res) - info(HA_STATUS_VARIABLE | HA_STATUS_CONST); - - DBUG_RETURN(res); + + DBUG_RETURN(get_metadata(name)); } @@ -3575,13 +3574,7 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb() Thd_ndb *thd_ndb; DBUG_ENTER("seize_thd_ndb"); -#ifdef USE_NDB_POOL - // Seize from pool - ndb= Ndb::seize(); - xxxxxxxxxxxxxx error -#else thd_ndb= new Thd_ndb(); -#endif thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); if (thd_ndb->ndb->init(max_transactions) != 0) { @@ -3602,46 +3595,45 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb() void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb) { DBUG_ENTER("release_thd_ndb"); -#ifdef USE_NDB_POOL - // Release to pool - Ndb::release(ndb); - xxxxxxxxxxxx error -#else delete thd_ndb; -#endif DBUG_VOID_RETURN; } /* - If this thread already has a Ndb object allocated + If this thread already has a Thd_ndb object allocated in current THD, reuse it. Otherwise - seize a Ndb object, assign it to current THD and use it. - - Having a Ndb object also means that a connection to - NDB cluster has been opened. The connection is - checked. + seize a Thd_ndb object, assign it to current THD and use it. */ -int ha_ndbcluster::check_ndb_connection() +Ndb* check_ndb_in_thd(THD* thd) { - THD *thd= current_thd; + DBUG_ENTER("check_ndb_in_thd"); Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; - DBUG_ENTER("check_ndb_connection"); if (!thd_ndb) { - thd_ndb= seize_thd_ndb(); - if (!thd_ndb) - DBUG_RETURN(2); + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + DBUG_RETURN(NULL); thd->transaction.thd_ndb= thd_ndb; } - m_ndb= thd_ndb->ndb; + DBUG_RETURN(thd_ndb->ndb); +} + + +int ha_ndbcluster::check_ndb_connection() +{ + THD* thd= current_thd; + DBUG_ENTER("check_ndb_connection"); + + if (!(m_ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); m_ndb->setDatabaseName(m_dbname); DBUG_RETURN(0); } + void ndbcluster_close_connection(THD *thd) { Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; @@ -3659,28 +3651,29 @@ void ndbcluster_close_connection(THD *thd) Try to discover one table from NDB */ -int ndbcluster_discover(const char *dbname, const char *name, +int ndbcluster_discover(THD* thd, const char *db, const char *name, const void** frmblob, uint* frmlen) { uint len; const void* data; const NDBTAB* tab; + Ndb* ndb; DBUG_ENTER("ndbcluster_discover"); - DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name)); + DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); - Ndb ndb(g_ndb_cluster_connection, dbname); - ndb.getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info)); + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + ndb->setDatabaseName(db); - if (ndb.init()) - ERR_RETURN(ndb.getNdbError()); - - if (ndb.waitUntilReady(0)) - ERR_RETURN(ndb.getNdbError()); - - if (!(tab= ndb.getDictionary()->getTable(name))) - { - DBUG_PRINT("info", ("Table %s not found", name)); - DBUG_RETURN(1); + NDBDICT* dict= ndb->getDictionary(); + dict->set_local_table_data_size(sizeof(Ndb_table_local_info)); + dict->invalidateTable(name); + if (!(tab= dict->getTable(name))) + { + const NdbError err= dict->getNdbError(); + if (err.code == 709) + DBUG_RETURN(1); + ERR_RETURN(err); } DBUG_PRINT("info", ("Found table %s", tab->getName())); @@ -3702,41 +3695,196 @@ int ndbcluster_discover(const char *dbname, const char *name, DBUG_RETURN(0); } - -#ifdef USE_DISCOVER_ON_STARTUP /* - Dicover tables from NDB Cluster - - fetch a list of tables from NDB - - store the frm file for each table on disk - - if the table has an attached frm file - - if the database of the table exists -*/ + Check if a table exists in NDB + + */ -int ndb_discover_tables() +int ndbcluster_table_exists(THD* thd, const char *db, const char *name) +{ + uint len; + const void* data; + const NDBTAB* tab; + Ndb* ndb; + DBUG_ENTER("ndbcluster_table_exists"); + DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); + + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + ndb->setDatabaseName(db); + + NDBDICT* dict= ndb->getDictionary(); + dict->set_local_table_data_size(sizeof(Ndb_table_local_info)); + dict->invalidateTable(name); + if (!(tab= dict->getTable(name))) + { + const NdbError err= dict->getNdbError(); + if (err.code == 709) + DBUG_RETURN(0); + ERR_RETURN(err); + } + + DBUG_PRINT("info", ("Found table %s", tab->getName())); + DBUG_RETURN(1); +} + + + +extern "C" byte* tables_get_key(const char *entry, uint *length, + my_bool not_used __attribute__((unused))) +{ + *length= strlen(entry); + return (byte*) entry; +} + + +int ndbcluster_find_files(THD *thd,const char *db,const char *path, + const char *wild, bool dir, List<char> *files) { uint i; + Ndb* ndb; + char name[FN_REFLEN]; + HASH ndb_tables, ok_tables; NdbDictionary::Dictionary::List list; - NdbDictionary::Dictionary* dict; - char path[FN_REFLEN]; - DBUG_ENTER("ndb_discover_tables"); - - /* List tables in NDB Cluster kernel */ - dict= g_ndb->getDictionary(); + DBUG_ENTER("ndbcluster_find_files"); + DBUG_PRINT("enter", ("db: %s", db)); + + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + + if (dir) + DBUG_RETURN(0); // Discover of databases not yet supported + + // List tables in NDB + NDBDICT *dict= ndb->getDictionary(); if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) ERR_RETURN(dict->getNdbError()); - + + if (hash_init(&ndb_tables, system_charset_info,list.count,0,0, + (hash_get_key)tables_get_key,0,0)) + { + DBUG_PRINT("error", ("Failed to init HASH ndb_tables")); + DBUG_RETURN(-1); + } + + if (hash_init(&ok_tables, system_charset_info,32,0,0, + (hash_get_key)tables_get_key,0,0)) + { + DBUG_PRINT("error", ("Failed to init HASH ok_tables")); + hash_free(&ndb_tables); + DBUG_RETURN(-1); + } + for (i= 0 ; i < list.count ; i++) { NdbDictionary::Dictionary::List::Element& t= list.elements[i]; + DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name)); - DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name)); - if (create_table_from_handler(t.database, t.name, true)) - DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name)); + // Apply wildcard to list of tables in NDB + if (wild) + { + if (lower_case_table_names) + { + if (wild_case_compare(files_charset_info, t.name, wild)) + continue; + } + else if (wild_compare(t.name,wild,0)) + continue; + } + DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name)); + my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name)); } - DBUG_RETURN(0); + + char *file_name; + List_iterator<char> it(*files); + List<char> delete_list; + while ((file_name=it++)) + { + DBUG_PRINT("info", ("%s", file_name)); + if (hash_search(&ndb_tables, file_name, strlen(file_name))) + { + DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name)); + // File existed in NDB and as frm file, put in ok_tables list + my_hash_insert(&ok_tables, (byte*)file_name); + continue; + } + + // File is not in NDB, check for .ndb file with this name + (void)strxnmov(name, FN_REFLEN, + mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS); + DBUG_PRINT("info", ("Check access for %s", name)); + if (access(name, F_OK)) + { + DBUG_PRINT("info", ("%s did not exist on disk", name)); + // .ndb file did not exist on disk, another table type + continue; + } + + DBUG_PRINT("info", ("%s existed on disk", name)); + // The .ndb file exists on disk, but it's not in list of tables in ndb + // Verify that handler agrees table is gone. + if (ndbcluster_table_exists(thd, db, file_name) == 0) + { + DBUG_PRINT("info", ("NDB says %s does not exists", file_name)); + it.remove(); + // Put in list of tables to remove from disk + delete_list.push_back(thd->strdup(file_name)); + } + } + + // Check for new files to discover + DBUG_PRINT("info", ("Checking for new files to discover")); + List<char> create_list; + for (i= 0 ; i < ndb_tables.records ; i++) + { + file_name= hash_element(&ndb_tables, i); + if (!hash_search(&ok_tables, file_name, strlen(file_name))) + { + DBUG_PRINT("info", ("%s must be discovered", file_name)); + // File is in list of ndb tables and not in ok_tables + // This table need to be created + create_list.push_back(thd->strdup(file_name)); + } + } + + // Lock mutex before deleting and creating frm files + pthread_mutex_lock(&LOCK_open); + + if (!global_read_lock) + { + // Delete old files + List_iterator_fast<char> it3(delete_list); + while ((file_name=it3++)) + { + DBUG_PRINT("info", ("Remove table %s/%s",db, file_name )); + // Delete the table and all related files + TABLE_LIST table_list; + bzero((char*) &table_list,sizeof(table_list)); + table_list.db= (char*) db; + table_list.real_name=(char*)file_name; + (void)mysql_rm_table_part2(thd, &table_list, + /* if_exists */ true, + /* drop_temporary */ false, + /* dont_log_query*/ true); + } + } + + // Create new files + List_iterator_fast<char> it2(create_list); + while ((file_name=it2++)) + { + DBUG_PRINT("info", ("Table %s need discovery", name)); + ha_create_table_from_engine(thd, db, file_name, true); + files->push_back(thd->strdup(file_name)); + } + + pthread_mutex_unlock(&LOCK_open); + + hash_free(&ok_tables); + hash_free(&ndb_tables); + DBUG_RETURN(0); } -#endif /* @@ -3791,7 +3939,7 @@ bool ndbcluster_init() ndbcluster_inited= 1; #ifdef USE_DISCOVER_ON_STARTUP - if (res == 0 && ndb_discover_tables() != 0) + if (ndb_discover_tables() != 0) DBUG_RETURN(TRUE); #endif DBUG_RETURN(false); @@ -3807,7 +3955,6 @@ bool ndbcluster_init() bool ndbcluster_end() { DBUG_ENTER("ndbcluster_end"); - if(g_ndb) delete g_ndb; g_ndb= NULL; @@ -3817,9 +3964,6 @@ bool ndbcluster_end() if (!ndbcluster_inited) DBUG_RETURN(0); hash_free(&ndbcluster_open_tables); -#ifdef USE_NDB_POOL - ndb_pool_release(); -#endif pthread_mutex_destroy(&ndbcluster_mutex); ndbcluster_inited= 0; DBUG_RETURN(0); @@ -4138,7 +4282,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, if (pOp == NULL) break; - NdbResultSet* rs= pOp->readTuples(NdbScanOperation::LM_Dirty); + NdbResultSet* rs= pOp->readTuples(NdbOperation::LM_CommittedRead); if (rs == 0) break; @@ -4178,4 +4322,30 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, DBUG_RETURN(-1); } +/* + Create a .ndb file to serve as a placeholder indicating + that the table with this name is a ndb table +*/ + +int ha_ndbcluster::write_ndb_file() +{ + File file; + bool error=1; + char path[FN_REFLEN]; + + DBUG_ENTER("write_ndb_file"); + DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname)); + + (void)strxnmov(path, FN_REFLEN, + mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS); + + if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0) + { + // It's an empty file + error=0; + my_close(file,MYF(0)); + } + DBUG_RETURN(error); +} + #endif /* HAVE_NDBCLUSTER_DB */ |