diff options
author | unknown <tomas@poseidon.ndb.mysql.com[tomas]> | 2005-11-06 00:20:37 +0100 |
---|---|---|
committer | unknown <tomas@poseidon.ndb.mysql.com[tomas]> | 2005-11-06 00:20:37 +0100 |
commit | 65ab8b01f54f8f515dae4a61ed8a8eb927c624f3 (patch) | |
tree | 916e0aadd51f3c57e92a75aa245cd68b768b726a /sql/ha_ndbcluster.cc | |
parent | 2dc00e6a6927022e21b824cc1da4e81792efeaa7 (diff) | |
download | mariadb-git-65ab8b01f54f8f515dae4a61ed8a8eb927c624f3.tar.gz |
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 885 |
1 files changed, 673 insertions, 212 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 2340cc006f6..fc31f4854ab 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -46,6 +46,7 @@ static const int parallelism= 0; static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used static const char *ha_ndb_ext=".ndb"; +static const char share_prefix[]= "./"; static int ndbcluster_close_connection(THD *thd); static int ndbcluster_commit(THD *thd, bool all); @@ -99,12 +100,15 @@ handlerton ndbcluster_hton = { } // Typedefs for long names +typedef NdbDictionary::Object NDBOBJ; typedef NdbDictionary::Column NDBCOL; typedef NdbDictionary::Table NDBTAB; typedef NdbDictionary::Index NDBINDEX; typedef NdbDictionary::Dictionary NDBDICT; +typedef NdbDictionary::Event NDBEVENT; -bool ndbcluster_inited= FALSE; +static int ndbcluster_inited= 0; +static int ndbcluster_util_inited= 0; static Ndb* g_ndb= NULL; static Ndb_cluster_connection* g_ndb_cluster_connection= NULL; @@ -117,9 +121,12 @@ static HASH ndbcluster_open_tables; static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length, my_bool not_used __attribute__((unused))); -static void ndb_set_fragmentation(NDBTAB & tab, TABLE *table, uint pk_len); -static NDB_SHARE *get_share(const char *table_name); -static void free_share(NDB_SHARE *share); +static NDB_SHARE *get_share(const char *key, + bool create_if_not_exists= TRUE, + bool have_lock= FALSE); +static void free_share(NDB_SHARE **share, bool have_lock= FALSE); +static void real_free_share(NDB_SHARE **share); +static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len); static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len); static int unpackfrm(const void **data, uint *len, @@ -128,6 +135,33 @@ static int unpackfrm(const void **data, uint *len, static int ndb_get_table_statistics(Ndb*, const char *, struct Ndb_statistics *); +#ifndef DBUG_OFF +void print_records(TABLE *table, const char *record) +{ + if (_db_on_) + { + for (uint j= 0; j < table->s->fields; j++) + { + char buf[40]; + int pos= 0; + Field *field= table->field[j]; + const byte* field_ptr= field->ptr - table->record[0] + record; + int pack_len= field->pack_length(); + int n= pack_len < 10 ? pack_len : 10; + + for (int i= 0; i < n && pos < 20; i++) + { + pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]); + } + buf[pos]= 0; + DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf)); + } + } +} +#else +#define print_records(a,b) +#endif + // Util thread variables static pthread_t ndb_util_thread; pthread_mutex_t LOCK_ndb_util_thread; @@ -179,65 +213,70 @@ struct show_var_st ndb_status_variables[]= { {NullS, NullS, SHOW_LONG} }; +/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ +extern Uint64 g_latest_trans_gci; + /* Error handling functions */ -struct err_code_mapping -{ - int ndb_err; - int my_err; - int show_warning; -}; +/* Note for merge: old mapping table, moved to storage/ndb/ndberror.c */ -static const err_code_mapping err_map[]= +static int ndb_to_mysql_error(const NdbError *ndberr) { - { 626, HA_ERR_KEY_NOT_FOUND, 0 }, - { 630, HA_ERR_FOUND_DUPP_KEY, 0 }, - { 893, HA_ERR_FOUND_DUPP_KEY, 0 }, - { 721, HA_ERR_TABLE_EXIST, 1 }, - { 4244, HA_ERR_TABLE_EXIST, 1 }, - - { 709, HA_ERR_NO_SUCH_TABLE, 0 }, + /* read the mysql mapped error code */ + int error= ndberr->mysql_code; - { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 }, - { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 }, - { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 }, - { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 }, - { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 }, - - { 623, HA_ERR_RECORD_FILE_FULL, 1 }, - { 624, HA_ERR_RECORD_FILE_FULL, 1 }, - { 625, HA_ERR_RECORD_FILE_FULL, 1 }, - { 826, HA_ERR_RECORD_FILE_FULL, 1 }, - { 827, HA_ERR_RECORD_FILE_FULL, 1 }, - { 832, HA_ERR_RECORD_FILE_FULL, 1 }, - - { 284, HA_ERR_TABLE_DEF_CHANGED, 0 }, - - { 0, 1, 0 }, - - { -1, -1, 1 } -}; + switch (error) + { + /* errors for which we do not add warnings, just return mapped error code + */ + case HA_ERR_NO_SUCH_TABLE: + case HA_ERR_KEY_NOT_FOUND: + case HA_ERR_FOUND_DUPP_KEY: + return error; + + /* Mapping missing, go with the ndb error code*/ + case -1: + error= ndberr->code; + break; + /* Mapping exists, go with the mapped code */ + default: + break; + } -static int ndb_to_mysql_error(const NdbError *err) -{ - uint i; - for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++); - if (err_map[i].show_warning) - { - // Push the NDB error message as warning + /* + Push the NDB error message as warning + - Used to be able to use SHOW WARNINGS toget more info on what the error is + - Used by replication to see if the error was temporary + */ + if (ndberr->status == NdbError::TemporaryError) push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, - ER_GET_ERRMSG, ER(ER_GET_ERRMSG), - err->code, err->message, "NDB"); - } - if (err_map[i].my_err == -1) - return err->code; - return err_map[i].my_err; + ER_GET_TEMPORARY_ERRMSG, ER(ER_GET_TEMPORARY_ERRMSG), + ndberr->code, ndberr->message, "NDB"); + else + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + ndberr->code, ndberr->message, "NDB"); + return error; } +int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans) +{ + int res= trans->execute(NdbTransaction::NoCommit, + NdbTransaction::AO_IgnoreError, + h->m_force_send); + if (res == 0) + return 0; + + const NdbError &err= trans->getNdbError(); + if (err.classification != NdbError::ConstraintViolation && + err.classification != NdbError::NoDataFound) + return res; + return 0; +} inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) @@ -247,9 +286,11 @@ int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) if (m_batch_execute) return 0; #endif - return trans->execute(NdbTransaction::NoCommit, - NdbTransaction::AbortOnError, - h->m_force_send); + return h->m_ignore_no_key ? + execute_no_commit_ignore_no_key(h,trans) : + trans->execute(NdbTransaction::NoCommit, + NdbTransaction::AbortOnError, + h->m_force_send); } inline @@ -437,29 +478,38 @@ void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd) # The mapped error code */ -void ha_ndbcluster::invalidate_dictionary_cache(bool global) +void +ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb, + const char *tabname, bool global) { - NDBDICT *dict= get_ndb()->getDictionary(); + NDBDICT *dict= ndb->getDictionary(); DBUG_ENTER("invalidate_dictionary_cache"); - DBUG_PRINT("info", ("invalidating %s", m_tabname)); + DBUG_PRINT("info", ("invalidating %s", tabname)); if (global) { - const NDBTAB *tab= dict->getTable(m_tabname); + const NDBTAB *tab= dict->getTable(tabname); if (!tab) DBUG_VOID_RETURN; if (tab->getObjectStatus() == NdbDictionary::Object::Invalid) { // Global cache has already been invalidated - dict->removeCachedTable(m_tabname); + dict->removeCachedTable(tabname); global= FALSE; } else - dict->invalidateTable(m_tabname); + dict->invalidateTable(tabname); } else - dict->removeCachedTable(m_tabname); + dict->removeCachedTable(tabname); table->s->version=0L; /* Free when thread is ready */ + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::invalidate_dictionary_cache(bool global) +{ + NDBDICT *dict= get_ndb()->getDictionary(); + invalidate_dictionary_cache(table, get_ndb(), m_tabname, global); /* Invalidate indexes */ for (uint i= 0; i < table->s->keys; i++) { @@ -491,7 +541,6 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global) break; } } - DBUG_VOID_RETURN; } int ha_ndbcluster::ndb_err(NdbTransaction *trans) @@ -648,14 +697,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, */ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, - uint fieldnr, bool *set_blob_value) + uint fieldnr, int row_offset, + bool *set_blob_value) { - const byte* field_ptr= field->ptr; - uint32 pack_len= field->pack_length(); + const byte* field_ptr= field->ptr + row_offset; + uint32 pack_len= field->pack_length(); DBUG_ENTER("set_ndb_value"); - DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s", + DBUG_PRINT("enter", ("%d: %s type: %u len=%d is_null=%s", fieldnr, field->field_name, field->type(), - pack_len, field->is_null()?"Y":"N")); + pack_len, field->is_null(row_offset) ? "Y" : "N")); DBUG_DUMP("value", (char*) field_ptr, pack_len); DBUG_ASSERT(ndb_supported_type(field->type())); @@ -666,7 +716,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, { pack_len= sizeof(empty_field); field_ptr= (byte *)&empty_field; - if (field->is_null()) + if (field->is_null(row_offset)) empty_field= 0; else empty_field= 1; @@ -675,12 +725,15 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, { if (field->type() != MYSQL_TYPE_BIT) { - if (field->is_null()) + if (field->is_null(row_offset)) + { + DBUG_PRINT("info", ("field is NULL")); // Set value to NULL DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); + } // Common implementation for most field types - DBUG_RETURN(ndb_op->setValue(fieldnr, + DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); } else // if (field->type() == MYSQL_TYPE_BIT) @@ -690,7 +743,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, // Round up bit field length to nearest word boundry pack_len= ((pack_len + 3) >> 2) << 2; DBUG_ASSERT(pack_len <= 8); - if (field->is_null()) + if (field->is_null(row_offset)) // Set value to NULL DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); DBUG_PRINT("info", ("bit field")); @@ -709,7 +762,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr); if (ndb_blob != NULL) { - if (field->is_null()) + if (field->is_null(row_offset)) DBUG_RETURN(ndb_blob->setNull() != 0); Field_blob *field_blob= (Field_blob*)field; @@ -790,8 +843,8 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) { char *buf= m_blobs_buffer + offset; uint32 len= 0xffffffff; // Max uint32 - DBUG_PRINT("value", ("read blob ptr=%x len=%u", - (UintPtr)buf, (uint)blob_len)); + DBUG_PRINT("value", ("read blob ptr=%lx len=%u", + buf, (uint) blob_len)); if (ndb_blob->readData(buf, len) != 0) DBUG_RETURN(-1); DBUG_ASSERT(len == blob_len); @@ -902,6 +955,19 @@ bool ha_ndbcluster::uses_blob_value() of table accessed in NDB */ +static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data, + uint pack_length) +{ + DBUG_ENTER("cmp_frm"); + /* + Compare FrmData in NDB with frm file from disk. + */ + if ((pack_length != ndbtab->getFrmLength()) || + (memcmp(pack_data, ndbtab->getFrmData(), pack_length))) + DBUG_RETURN(1); + DBUG_RETURN(0); +} + int ha_ndbcluster::get_metadata(const char *path) { Ndb *ndb= get_ndb(); @@ -939,8 +1005,7 @@ int ha_ndbcluster::get_metadata(const char *path) DBUG_RETURN(1); } - if ((pack_length != tab->getFrmLength()) || - (memcmp(pack_data, tab->getFrmData(), pack_length))) + if (cmp_frm(tab, pack_data, pack_length)) { if (!invalidating_ndb_table) { @@ -951,9 +1016,9 @@ int ha_ndbcluster::get_metadata(const char *path) else { DBUG_PRINT("error", - ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", + ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", pack_length, tab->getFrmLength(), - memcmp(pack_data, tab->getFrmData(), pack_length))); + memcmp(pack_data, tab->getFrmData(), pack_length))); DBUG_DUMP("pack_data", (char*)pack_data, pack_length); DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength()); error= 3; @@ -1980,7 +2045,7 @@ int ha_ndbcluster::write_row(byte *record) DBUG_ENTER("write_row"); - if (m_ignore_dup_key && table->s->primary_key != MAX_KEY) + if (!m_use_write && m_ignore_dup_key && table->s->primary_key != MAX_KEY) { int peek_res= peek_row(record); @@ -2057,7 +2122,8 @@ int ha_ndbcluster::write_row(byte *record) { Field *field= table->field[i]; if (!(field->flags & PRI_KEY_FLAG) && - set_ndb_value(op, field, i, &set_blob_value)) + (ha_get_bit_in_write_set(i + 1) || !m_use_write) && + set_ndb_value(op, field, i, record-table->record[0], &set_blob_value)) { m_skip_auto_increment= TRUE; ERR_RETURN(op->getNdbError()); @@ -2299,7 +2365,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) Field *field= table->field[i]; if (ha_get_bit_in_write_set(i+1) && (!(field->flags & PRI_KEY_FLAG)) && - set_ndb_value(op, field, i)) + set_ndb_value(op, field, i, new_data - table->record[0])) ERR_RETURN(op->getNdbError()); } @@ -2414,51 +2480,73 @@ int ha_ndbcluster::delete_row(const byte *record) set to null. */ -void ha_ndbcluster::unpack_record(byte* buf) +static void ndb_unpack_record(TABLE *table, NdbValue *value, + MY_BITMAP *defined, byte *buf) { + Field **p_field= table->field, *field= *p_field; uint row_offset= (uint) (buf - table->record[0]); - Field **field, **end; - NdbValue *value= m_value; - DBUG_ENTER("unpack_record"); + DBUG_ENTER("ndb_unpack_record"); - end= table->field + table->s->fields; - // Set null flag(s) bzero(buf, table->s->null_bytes); - for (field= table->field; - field < end; - field++, value++) + for ( ; field; + p_field++, value++, field= *p_field) { if ((*value).ptr) { - if (! ((*field)->flags & BLOB_FLAG)) + if (!(field->flags & BLOB_FLAG)) { - if ((*value).rec->isNULL()) - (*field)->set_null(row_offset); - else if ((*field)->type() == MYSQL_TYPE_BIT) + int is_null= (*value).rec->isNULL(); + if (is_null) { - uint pack_len= (*field)->pack_length(); - if (pack_len < 5) + if (is_null > 0) + { + DBUG_PRINT("info",("[%u] NULL", + (*value).rec->getColumn()->getColumnNo())); + field->set_null(row_offset); + } + else + { + DBUG_PRINT("info",("[%u] UNDEFINED", + (*value).rec->getColumn()->getColumnNo())); + bitmap_clear_bit(defined, + (*value).rec->getColumn()->getColumnNo()); + } + } + else if (field->type() == MYSQL_TYPE_BIT) + { + byte *save_field_ptr= field->ptr; + field->ptr= save_field_ptr + row_offset; + if (field->pack_length() < 5) { DBUG_PRINT("info", ("bit field H'%.8X", (*value).rec->u_32_value())); - ((Field_bit *) *field)->store((longlong) - (*value).rec->u_32_value(), - FALSE); + ((Field_bit*) field)->store((longlong) + (*value).rec->u_32_value(), FALSE); } else { DBUG_PRINT("info", ("bit field H'%.8X%.8X", - *(Uint32 *)(*value).rec->aRef(), - *((Uint32 *)(*value).rec->aRef()+1))); - ((Field_bit *) *field)->store((longlong) - (*value).rec->u_64_value(), TRUE); + *(Uint32*) (*value).rec->aRef(), + *((Uint32*) (*value).rec->aRef()+1))); + ((Field_bit*) field)->store((longlong) + (*value).rec->u_64_value(),TRUE); } + field->ptr= save_field_ptr; + DBUG_PRINT("info",("[%u] SET", + (*value).rec->getColumn()->getColumnNo())); + DBUG_DUMP("info", (const char*) field->ptr, field->field_length); + } + else + { + DBUG_PRINT("info",("[%u] SET", + (*value).rec->getColumn()->getColumnNo())); + DBUG_DUMP("info", (const char*) field->ptr, field->field_length); } } else { - NdbBlob* ndb_blob= (*value).blob; + NdbBlob *ndb_blob= (*value).blob; bool isNull= TRUE; #ifndef DBUG_OFF int ret= @@ -2466,11 +2554,16 @@ void ha_ndbcluster::unpack_record(byte* buf) ndb_blob->getNull(isNull); DBUG_ASSERT(ret == 0); if (isNull) - (*field)->set_null(row_offset); + field->set_null(row_offset); } } } - + DBUG_VOID_RETURN; +} + +void ha_ndbcluster::unpack_record(byte *buf) +{ + ndb_unpack_record(table, m_value, 0, buf); #ifndef DBUG_OFF // Read and print all values that was fetched if (table->s->primary_key == MAX_KEY) @@ -2486,7 +2579,6 @@ void ha_ndbcluster::unpack_record(byte* buf) } //print_results(); #endif - DBUG_VOID_RETURN; } /* @@ -3207,8 +3299,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) Thd_ndb *thd_ndb= get_thd_ndb(thd); Ndb *ndb= thd_ndb->ndb; - DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d", - thd, thd_ndb, thd_ndb->lock_count)); + DBUG_PRINT("enter", ("this: %x thd: %lx thd_ndb: %lx " + "thd_ndb->lock_count: %d", + this, thd, thd_ndb, thd_ndb->lock_count)); if (lock_type != F_UNLCK) { @@ -3461,7 +3554,8 @@ int ndbcluster_commit(THD *thd, bool all) while ((share= it++)) { pthread_mutex_lock(&share->mutex); - DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ", share->table_name, share->commit_count)); + DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ", + share->key, share->commit_count)); share->commit_count= 0; share->commit_count_lock++; pthread_mutex_unlock(&share->mutex); @@ -3790,6 +3884,10 @@ static int create_ndb_column(NDBCOL &col, return 0; } +/* + Create a table in NDB Cluster +*/ + int ha_ndbcluster::create(const char *name, TABLE *form, HA_CREATE_INFO *info) @@ -3815,7 +3913,8 @@ int ha_ndbcluster::create(const char *name, caller. Do Ndb specific stuff, such as create a .ndb file */ - my_errno= write_ndb_file(); + if ((my_errno= write_ndb_file())) + DBUG_RETURN(my_errno); DBUG_RETURN(my_errno); } @@ -3829,7 +3928,7 @@ int ha_ndbcluster::create(const char *name, if (packfrm(data, length, &pack_data, &pack_length)) DBUG_RETURN(2); - DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length)); + DBUG_PRINT("info", ("setFrm data=%lx len=%d", pack_data, pack_length)); tab.setFrm(pack_data, pack_length); my_free((char*)data, MYF(0)); my_free((char*)pack_data, MYF(0)); @@ -4027,14 +4126,22 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) if (!(orig_tab= dict->getTable(m_tabname))) ERR_RETURN(dict->getNdbError()); } + m_table= (void *)orig_tab; // Change current database to that of target table set_dbname(to); ndb->setDatabaseName(m_dbname); - if (!(result= alter_table_name(new_tabname))) + + if ((result= alter_table_name(new_tabname))) + { + DBUG_RETURN(result); + } + + // Rename .ndb file + if ((result= handler::rename_table(from, to))) { - // Rename .ndb file - result= handler::rename_table(from, to); + // ToDo in 4.1 should rollback alter table... + DBUG_RETURN(result); } DBUG_RETURN(result); @@ -4050,7 +4157,7 @@ int ha_ndbcluster::alter_table_name(const char *to) Ndb *ndb= get_ndb(); NDBDICT *dict= ndb->getDictionary(); const NDBTAB *orig_tab= (const NDBTAB *) m_table; - DBUG_ENTER("alter_table_name_table"); + DBUG_ENTER("alter_table_name"); NdbDictionary::Table new_tab= *orig_tab; new_tab.setName(to); @@ -4069,6 +4176,38 @@ int ha_ndbcluster::alter_table_name(const char *to) */ +/* static version which does not need a handler */ + +int +ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, + const char *path, + const char *db, + const char *table_name) +{ + DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); + NDBDICT *dict= ndb->getDictionary(); + + /* Drop the table from NDB */ + + int res; + if (h) + { + res= h->drop_table(); + } + else + { + ndb->setDatabaseName(db); + res= dict->dropTable(table_name); + } + + if (res) + { + DBUG_RETURN(res); + } + + DBUG_RETURN(0); +} + int ha_ndbcluster::delete_table(const char *name) { DBUG_ENTER("ha_ndbcluster::delete_table"); @@ -4081,9 +4220,8 @@ int ha_ndbcluster::delete_table(const char *name) /* Call ancestor function to delete .ndb file */ handler::delete_table(name); - - /* Drop the table from NDB */ - DBUG_RETURN(drop_table()); + + DBUG_RETURN(delete_table(this, get_ndb(),name, m_dbname, m_tabname)); } @@ -4149,6 +4287,15 @@ ulonglong ha_ndbcluster::get_auto_increment() Constructor for the NDB Cluster table handler */ +#define HA_NDBCLUSTER_TABLE_FLAGS \ + HA_REC_NOT_IN_SEQ | \ + HA_NULL_IN_KEY | \ + HA_AUTO_PART_KEY | \ + HA_NO_PREFIX_CHAR_KEYS | \ + HA_NEED_READ_RANGE_BUFFER | \ + HA_CAN_GEOMETRY | \ + HA_CAN_BIT_FIELD + ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): handler(&ndbcluster_hton, table_arg), m_active_trans(NULL), @@ -4156,13 +4303,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_table(NULL), m_table_version(-1), m_table_info(NULL), - m_table_flags(HA_REC_NOT_IN_SEQ | - HA_NULL_IN_KEY | - HA_AUTO_PART_KEY | - HA_NO_PREFIX_CHAR_KEYS | - HA_NEED_READ_RANGE_BUFFER | - HA_CAN_GEOMETRY | - HA_CAN_BIT_FIELD), + m_table_flags(HA_NDBCLUSTER_TABLE_FLAGS), m_share(0), m_part_info(NULL), m_use_partition_function(FALSE), @@ -4170,6 +4311,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_use_write(FALSE), m_ignore_dup_key(FALSE), m_primary_key_update(FALSE), + m_ignore_no_key(FALSE), m_rows_to_insert((ha_rows) 1), m_rows_inserted((ha_rows) 0), m_bulk_insert_rows((ha_rows) 1024), @@ -4223,7 +4365,9 @@ ha_ndbcluster::~ha_ndbcluster() DBUG_ENTER("~ha_ndbcluster"); if (m_share) - free_share(m_share); + { + free_share(&m_share); + } release_metadata(); my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer= 0; @@ -4256,8 +4400,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) int res; KEY *key; DBUG_ENTER("open"); - DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d", - name, mode, test_if_locked)); + DBUG_PRINT("enter", ("this: %d name: %s mode: %d test_if_locked: %d", + this, name, mode, test_if_locked)); // Setup ref_length to make room for the whole // primary key to be written in the ref variable @@ -4277,7 +4421,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) set_tabname(name); if (check_ndb_connection()) { - free_share(m_share); m_share= 0; + free_share(&m_share); + m_share= 0; DBUG_RETURN(HA_ERR_NO_CONNECTION); } @@ -4306,7 +4451,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) int ha_ndbcluster::close(void) { DBUG_ENTER("close"); - free_share(m_share); m_share= 0; + free_share(&m_share); + m_share= 0; release_metadata(); DBUG_RETURN(0); } @@ -4509,21 +4655,24 @@ int ndbcluster_drop_database(const char *path) ERR_RETURN(dict->getNdbError()); for (i= 0 ; i < list.count ; i++) { - NdbDictionary::Dictionary::List::Element& t= list.elements[i]; - DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name)); + NdbDictionary::Dictionary::List::Element& elmt= list.elements[i]; + DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name)); // Add only tables that belongs to db - if (my_strcasecmp(system_charset_info, t.database, dbname)) + if (my_strcasecmp(system_charset_info, elmt.database, dbname)) continue; - DBUG_PRINT("info", ("%s must be dropped", t.name)); - drop_list.push_back(thd->strdup(t.name)); + DBUG_PRINT("info", ("%s must be dropped", elmt.name)); + drop_list.push_back(thd->strdup(elmt.name)); } // Drop any tables belonging to database + char full_path[FN_REFLEN]; + char *tmp= strxnmov(full_path, FN_REFLEN, share_prefix, dbname, "/", NullS); ndb->setDatabaseName(dbname); List_iterator_fast<char> it(drop_list); while ((tabname=it++)) { - if (dict->dropTable(tabname)) + strxnmov(tmp, FN_REFLEN - (tmp - full_path), tabname, NullS); + if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname)) { const NdbError err= dict->getNdbError(); if (err.code != 709) @@ -4536,6 +4685,92 @@ int ndbcluster_drop_database(const char *path) DBUG_RETURN(ret); } +/* + find all tables in ndb and discover those needed +*/ +static int ndbcluster_find_all_files(THD *thd) +{ + DBUG_ENTER("ndbcluster_find_all_files"); + Ndb* ndb; + char key[FN_REFLEN]; + NdbDictionary::Dictionary::List list; + + if (!(ndb= check_ndb_in_thd(thd))) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + + NDBDICT *dict= ndb->getDictionary(); + + int unhandled, retries= 5; + do + { + if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) + ERR_RETURN(dict->getNdbError()); + unhandled= 0; + for (uint i= 0 ; i < list.count ; i++) + { + NDBDICT::List::Element& elmt= list.elements[i]; + DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); + if (!(elmt.state == NDBOBJ::StateBuilding || + elmt.state == NDBOBJ::StateOnline)) + { + sql_print_information("NDB: skipping setup table %s.%s, in state %d", + elmt.database, elmt.name, elmt.state); + continue; + } + + ndb->setDatabaseName(elmt.database); + const NDBTAB *ndbtab; + + if (!(ndbtab= dict->getTable(elmt.name))) + { + sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", + elmt.database, elmt.name, + dict->getNdbError().code, + dict->getNdbError().message); + unhandled++; + continue; + } + + if (ndbtab->getFrmLength() == 0) + continue; + + strxnmov(key, FN_LEN, mysql_data_home, "/", + elmt.database, "/", elmt.name, NullS); + const void *data= 0, *pack_data= 0; + uint length, pack_length; + int discover= 0; + if (readfrm(key, &data, &length) || + packfrm(data, length, &pack_data, &pack_length)) + { + discover= 1; + sql_print_information("NDB: missing frm for %s.%s, discovering...", + elmt.database, elmt.name); + } + else if (cmp_frm(ndbtab, pack_data, pack_length)) + { + discover= 1; + sql_print_information("NDB: mismatch in frm for %s.%s, discovering...", + elmt.database, elmt.name); + } + my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR)); + + if (discover) + { + /* ToDo 4.1 database needs to be created if missing */ + pthread_mutex_lock(&LOCK_open); + if (ha_create_table_from_engine(thd, elmt.database, elmt.name)) + { + /* ToDo 4.1 handle error */ + } + pthread_mutex_unlock(&LOCK_open); + } + } + } + while (unhandled && retries--); + + DBUG_RETURN(0); +} int ndbcluster_find_files(THD *thd,const char *db,const char *path, const char *wild, bool dir, List<char> *files) @@ -4547,7 +4782,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, Ndb* ndb; char name[FN_REFLEN]; HASH ndb_tables, ok_tables; - NdbDictionary::Dictionary::List list; + NDBDICT::List list; if (!(ndb= check_ndb_in_thd(thd))) DBUG_RETURN(HA_ERR_NO_CONNECTION); @@ -4578,11 +4813,11 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, for (i= 0 ; i < list.count ; i++) { - NdbDictionary::Dictionary::List::Element& t= list.elements[i]; - DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name)); + NDBDICT::List::Element& elmt= list.elements[i]; + DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name)); // Add only tables that belongs to db - if (my_strcasecmp(system_charset_info, t.database, db)) + if (my_strcasecmp(system_charset_info, elmt.database, db)) continue; // Apply wildcard to list of tables in NDB @@ -4590,14 +4825,14 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, { if (lower_case_table_names) { - if (wild_case_compare(files_charset_info, t.name, wild)) + if (wild_case_compare(files_charset_info, elmt.name, wild)) continue; } - else if (wild_compare(t.name,wild,0)) + else if (wild_compare(elmt.name,wild,0)) continue; } - DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name)); - my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name)); + DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", elmt.name)); + my_hash_insert(&ndb_tables, (byte*)thd->strdup(elmt.name)); } char *file_name; @@ -4645,10 +4880,15 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, file_name= hash_element(&ndb_tables, i); if (!hash_search(&ok_tables, file_name, strlen(file_name))) { - DBUG_PRINT("info", ("%s must be discovered", file_name)); - // File is in list of ndb tables and not in ok_tables - // This table need to be created - create_list.push_back(thd->strdup(file_name)); + strxnmov(name, sizeof(name), + mysql_data_home, "/", db, "/", file_name, reg_ext, NullS); + if (access(name, F_OK)) + { + DBUG_PRINT("info", ("%s must be discovered", file_name)); + // File is in list of ndb tables and not in ok_tables + // This table need to be created + create_list.push_back(thd->strdup(file_name)); + } } } @@ -4704,6 +4944,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, static int connect_callback() { update_status_variables(g_ndb_cluster_connection); + pthread_cond_signal(&COND_ndb_util_thread); return 0; } @@ -4778,6 +5019,7 @@ bool ndbcluster_init() (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, (hash_get_key) ndbcluster_get_key,0,0); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST); pthread_cond_init(&COND_ndb_util_thread, NULL); @@ -4840,6 +5082,7 @@ bool ndbcluster_end() pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread); ndbcluster_inited= 0; + ndbcluster_util_inited= 0; DBUG_RETURN(0); } @@ -5112,7 +5355,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, char name[FN_REFLEN]; NDB_SHARE *share; - (void)strxnmov(name, FN_REFLEN, "./",dbname,"/",tabname,NullS); + (void)strxnmov(name, FN_REFLEN, share_prefix, dbname, "/", tabname, NullS); DBUG_PRINT("enter", ("name: %s", name)); pthread_mutex_lock(&ndbcluster_mutex); if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables, @@ -5120,8 +5363,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, strlen(name)))) { pthread_mutex_unlock(&ndbcluster_mutex); - DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", - name)); + DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name)); DBUG_RETURN(1); } share->use_count++; @@ -5136,7 +5378,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, DBUG_PRINT("info", ("Getting commit_count: %llu from share", share->commit_count)); pthread_mutex_unlock(&share->mutex); - free_share(share); + free_share(&share); DBUG_RETURN(0); } } @@ -5151,7 +5393,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, struct Ndb_statistics stat; if (ndb_get_table_statistics(ndb, tabname, &stat)) { - free_share(share); + free_share(&share); DBUG_RETURN(1); } @@ -5168,7 +5410,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, *commit_count= 0; } pthread_mutex_unlock(&share->mutex); - free_share(share); + free_share(&share); DBUG_RETURN(0); } @@ -5305,6 +5547,60 @@ ha_ndbcluster::register_query_cache_table(THD *thd, } +#ifndef DBUG_OFF +static void dbug_print_table(const char *info, TABLE *table) +{ + if (table == 0) + { + DBUG_PRINT("info",("%s: (null)", info)); + return; + } + DBUG_PRINT("info", + ("%s: %s.%s s->fields: %d " + "reclength: %d rec_buff_length: %d record[0]: %lx " + "record[1]: %lx", + info, + table->s->db, + table->s->table_name, + table->s->fields, + table->s->reclength, + table->s->rec_buff_length, + table->record[0], + table->record[1])); + + for (unsigned int i= 0; i < table->s->fields; i++) + { + Field *f= table->field[i]; + DBUG_PRINT("info", + ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d " + "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]", + i, + f->field_name, + f->flags, + (f->flags & PRI_KEY_FLAG) ? "pri" : "attr", + (f->flags & NOT_NULL_FLAG) ? "" : ",nullable", + (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed", + (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "", + (f->flags & BLOB_FLAG) ? ",blob" : "", + (f->flags & BINARY_FLAG) ? ",binary" : "", + f->real_type(), + f->pack_length(), + f->ptr, f->ptr - table->record[0], + f->null_bit, + f->null_ptr, (byte*) f->null_ptr - table->record[0])); + if (f->type() == MYSQL_TYPE_BIT) + { + Field_bit *g= (Field_bit*) f; + DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] " + "bit_ofs: %u bit_len: %u", + g->field_length, g->bit_ptr, + (byte*) g->bit_ptr-table->record[0], + g->bit_ofs, g->bit_len)); + } + } +} +#endif + /* Handling the shared NDB_SHARE structure that is needed to provide table locking. @@ -5313,68 +5609,195 @@ ha_ndbcluster::register_query_cache_table(THD *thd, data we want to or can share. */ -static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length, +static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length, my_bool not_used __attribute__((unused))) { - *length=share->table_name_length; - return (byte*) share->table_name; + *length= share->key_length; + return (byte*) share->key; } -static NDB_SHARE* get_share(const char *table_name) +#ifndef DBUG_OFF +static void dbug_print_open_tables() +{ + DBUG_ENTER("dbug_print_open_tables"); + for (uint i= 0; i < ndbcluster_open_tables.records; i++) + { + NDB_SHARE *share= (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i); + DBUG_PRINT("share", + ("[%d] 0x%lx key: %s key_length: %d", + i, share, share->key, share->key_length)); + DBUG_PRINT("share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + share->db, share->table_name, + share->use_count, share->commit_count)); + } + DBUG_VOID_RETURN; +} +#else +#define dbug_print_open_tables() +#endif + +/* + Increase refcount on existing share. + Always returns share and cannot fail. +*/ +static NDB_SHARE *get_share(NDB_SHARE *share) { - NDB_SHARE *share; pthread_mutex_lock(&ndbcluster_mutex); - uint length=(uint) strlen(table_name); - if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables, - (byte*) table_name, - length))) + share->use_count++; + + dbug_print_open_tables(); + + DBUG_PRINT("get_share", + ("0x%lx key: %s key_length: %d", + share, share->key, share->key_length)); + DBUG_PRINT("get_share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + share->db, share->table_name, + share->use_count, share->commit_count)); + pthread_mutex_unlock(&ndbcluster_mutex); + return share; +} + +/* + Get a share object for key + + Returns share for key, and increases the refcount on the share. + + create_if_not_exists == TRUE: + creates share if it does not alreade exist + returns 0 only due to out of memory, and then sets my_error + + create_if_not_exists == FALSE: + returns 0 if share does not exist + + have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken +*/ +static NDB_SHARE *get_share(const char *key, bool create_if_not_exists, + bool have_lock) +{ + NDB_SHARE *share; + if (!have_lock) + pthread_mutex_lock(&ndbcluster_mutex); + uint length= (uint) strlen(key); + if (!(share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) key, + length))) { - if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1, + if (!create_if_not_exists) + { + DBUG_PRINT("error", ("get_share: %s does not exist", key)); + if (!have_lock) + pthread_mutex_unlock(&ndbcluster_mutex); + return 0; + } + if ((share= (NDB_SHARE*) my_malloc(sizeof(*share), MYF(MY_WME | MY_ZEROFILL)))) { - share->table_name_length=length; - share->table_name=(char*) (share+1); - strmov(share->table_name,table_name); + MEM_ROOT **root_ptr= + my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC); + MEM_ROOT *old_root= *root_ptr; + init_sql_alloc(&share->mem_root, 1024, 0); + *root_ptr= &share->mem_root; // remember to reset before return + + /* enough space for key, db, and table_name */ + share->key= alloc_root(*root_ptr, 2 * (length + 1)); + share->key_length= length; + strmov(share->key, key); if (my_hash_insert(&ndbcluster_open_tables, (byte*) share)) { - pthread_mutex_unlock(&ndbcluster_mutex); - my_free((gptr) share,0); + free_root(&share->mem_root, MYF(0)); + my_free((gptr) share, 0); + *root_ptr= old_root; + if (!have_lock) + pthread_mutex_unlock(&ndbcluster_mutex); return 0; } thr_lock_init(&share->lock); - pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); + pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST); share->commit_count= 0; share->commit_count_lock= 0; + share->db= share->key + length + 1; + ha_ndbcluster::set_dbname(key, share->db); + share->table_name= share->db + strlen(share->db) + 1; + ha_ndbcluster::set_tabname(key, share->table_name); + *root_ptr= old_root; } else { - DBUG_PRINT("error", ("Failed to alloc share")); - pthread_mutex_unlock(&ndbcluster_mutex); + DBUG_PRINT("error", ("get_share: failed to alloc share")); + if (!have_lock) + pthread_mutex_unlock(&ndbcluster_mutex); + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*share)); return 0; } } share->use_count++; - DBUG_PRINT("share", - ("table_name: %s, length: %d, use_count: %d, commit_count: %d", - share->table_name, share->table_name_length, share->use_count, - share->commit_count)); - pthread_mutex_unlock(&ndbcluster_mutex); + dbug_print_open_tables(); + + DBUG_PRINT("get_share", + ("0x%lx key: %s key_length: %d key: %s", + share, share->key, share->key_length, key)); + DBUG_PRINT("get_share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + share->db, share->table_name, + share->use_count, share->commit_count)); + if (!have_lock) + pthread_mutex_unlock(&ndbcluster_mutex); return share; } +static void real_free_share(NDB_SHARE **share) +{ + DBUG_PRINT("real_free_share", + ("0x%lx key: %s key_length: %d", + (*share), (*share)->key, (*share)->key_length)); + DBUG_PRINT("real_free_share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + (*share)->db, (*share)->table_name, + (*share)->use_count, (*share)->commit_count)); + + hash_delete(&ndbcluster_open_tables, (byte*) *share); + thr_lock_delete(&(*share)->lock); + pthread_mutex_destroy(&(*share)->mutex); + free_root(&(*share)->mem_root, MYF(0)); + + my_free((gptr) *share, MYF(0)); + *share= 0; + + dbug_print_open_tables(); +} + +/* + decrease refcount of share + calls real_free_share when refcount reaches 0 -static void free_share(NDB_SHARE *share) + have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken +*/ +static void free_share(NDB_SHARE **share, bool have_lock) { - pthread_mutex_lock(&ndbcluster_mutex); - if (!--share->use_count) + if (!have_lock) + pthread_mutex_lock(&ndbcluster_mutex); + if ((*share)->util_lock == current_thd) + (*share)->util_lock= 0; + if (!--(*share)->use_count) { - hash_delete(&ndbcluster_open_tables, (byte*) share); - thr_lock_delete(&share->lock); - pthread_mutex_destroy(&share->mutex); - my_free((gptr) share, MYF(0)); + real_free_share(share); } - pthread_mutex_unlock(&ndbcluster_mutex); + else + { + dbug_print_open_tables(); + DBUG_PRINT("free_share", + ("0x%lx key: %s key_length: %d", + *share, (*share)->key, (*share)->key_length)); + DBUG_PRINT("free_share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + (*share)->db, (*share)->table_name, + (*share)->use_count, (*share)->commit_count)); + } + if (!have_lock) + pthread_mutex_unlock(&ndbcluster_mutex); } @@ -5405,14 +5828,14 @@ static int packfrm(const void *data, uint len, uint blob_len; frm_blob_struct* blob; DBUG_ENTER("packfrm"); - DBUG_PRINT("enter", ("data: %x, len: %d", data, len)); + DBUG_PRINT("enter", ("data: 0x%lx len: %d", data, len)); error= 1; org_len= len; if (my_compress((byte*)data, &org_len, &comp_len)) goto err; - DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len)); + DBUG_PRINT("info", ("org_len: %d comp_len: %d", org_len, comp_len)); DBUG_DUMP("compressed", (char*)data, org_len); error= 2; @@ -5432,7 +5855,8 @@ static int packfrm(const void *data, uint len, *pack_len= blob_len; error= 0; - DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len)); + DBUG_PRINT("exit", + ("pack_data: 0x%lx pack_len: %d", *pack_data, *pack_len)); err: DBUG_RETURN(error); @@ -5446,7 +5870,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, byte *data; ulong complen, orglen, ver; DBUG_ENTER("unpackfrm"); - DBUG_PRINT("enter", ("pack_data: %x", pack_data)); + DBUG_PRINT("enter", ("pack_data: 0x%lx", pack_data)); complen= uint4korr((char*)&blob->head.complen); orglen= uint4korr((char*)&blob->head.orglen); @@ -5471,7 +5895,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, *unpack_data= data; *unpack_len= complen; - DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len)); + DBUG_PRINT("exit", ("frmdata: 0x%lx, len: %d", *unpack_data, *unpack_len)); DBUG_RETURN(0); } @@ -5484,11 +5908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, DBUG_ENTER("ndb_get_table_statistics"); DBUG_PRINT("enter", ("table: %s", table)); NdbTransaction* pTrans= ndb->startTransaction(); + if (pTrans == NULL) + ERR_RETURN(ndb->getNdbError()); do { - if (pTrans == NULL) - break; - NdbScanOperation* pOp= pTrans->getNdbScanOperation(table); if (pOp == NULL) break; @@ -6010,6 +6433,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) THD *thd; /* needs to be first for thread_stack */ Ndb* ndb; struct timespec abstime; + List<NDB_SHARE> util_open_tables; my_thread_init(); DBUG_ENTER("ndb_util_thread"); @@ -6030,10 +6454,51 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) delete ndb; DBUG_RETURN(NULL); } + thd->init_for_queries(); + thd->version=refresh_version; + thd->set_time(); + thd->main_security_ctx.host_or_ip= ""; + thd->client_capabilities = 0; + my_net_init(&thd->net, 0); + thd->main_security_ctx.master_access= ~0; + thd->main_security_ctx.priv_user = 0; + + /* + wait for mysql server to start + */ + pthread_mutex_lock(&LOCK_server_started); + while (!mysqld_server_started) + pthread_cond_wait(&COND_server_started, &LOCK_server_started); + pthread_mutex_unlock(&LOCK_server_started); + + /* + Wait for cluster to start + */ + pthread_mutex_lock(&LOCK_ndb_util_thread); + while (!ndb_cluster_node_id) + { + /* ndb not connected yet */ + set_timespec(abstime, 1); + pthread_cond_timedwait(&COND_ndb_util_thread, + &LOCK_ndb_util_thread, + &abstime); + if (abort_loop) + { + pthread_mutex_unlock(&LOCK_ndb_util_thread); + goto ndb_util_thread_end; + } + } + pthread_mutex_unlock(&LOCK_ndb_util_thread); + + /* + Get all table definitions from the storage node + */ + ndbcluster_find_all_files(thd); + + ndbcluster_util_inited= 1; - List<NDB_SHARE> util_open_tables; set_timespec(abstime, 0); - for (;;) + for (;!abort_loop;) { pthread_mutex_lock(&LOCK_ndb_util_thread); @@ -6041,10 +6506,10 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) &LOCK_ndb_util_thread, &abstime); pthread_mutex_unlock(&LOCK_ndb_util_thread); - +#ifdef NDB_EXTRA_DEBUG_UTIL_THREAD DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d", ndb_cache_check_time)); - +#endif if (abort_loop) break; /* Shutting down server */ @@ -6075,20 +6540,12 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) List_iterator_fast<NDB_SHARE> it(util_open_tables); while ((share= it++)) { - /* Split tab- and dbname */ - char buf[FN_REFLEN]; - char *tabname, *db; - uint length= dirname_length(share->table_name); - tabname= share->table_name+length; - memcpy(buf, share->table_name, length-1); - buf[length-1]= 0; - db= buf+dirname_length(buf); DBUG_PRINT("ndb_util_thread", ("Fetching commit count for: %s", - share->table_name)); + share->key)); /* Contact NDB to get commit count for table */ - ndb->setDatabaseName(db); + ndb->setDatabaseName(share->db); struct Ndb_statistics stat; uint lock; @@ -6096,17 +6553,17 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) lock= share->commit_count_lock; pthread_mutex_unlock(&share->mutex); - if (ndb_get_table_statistics(ndb, tabname, &stat) == 0) + if (ndb_get_table_statistics(ndb, share->table_name, &stat) == 0) { DBUG_PRINT("ndb_util_thread", ("Table: %s, commit_count: %llu, rows: %llu", - share->table_name, stat.commit_count, stat.row_count)); + share->key, stat.commit_count, stat.row_count)); } else { DBUG_PRINT("ndb_util_thread", ("Error: Could not get commit count for table %s", - share->table_name)); + share->key)); stat.commit_count= 0; } @@ -6116,7 +6573,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) pthread_mutex_unlock(&share->mutex); /* Decrease the use count and possibly free share */ - free_share(share); + free_share(&share); } /* Clear the list of open tables */ @@ -6143,7 +6600,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) abstime.tv_nsec-= 1000000000; } } - +ndb_util_thread_end: + net_end(&thd->net); thd->cleanup(); delete thd; delete ndb; @@ -7480,6 +7938,9 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack, DBUG_RETURN(0); } +/* + Implements the SHOW NDB STATUS command. +*/ int ndbcluster_show_status(THD* thd) { |