summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.ndb.mysql.com[tomas]>2005-11-06 00:20:37 +0100
committerunknown <tomas@poseidon.ndb.mysql.com[tomas]>2005-11-06 00:20:37 +0100
commit65ab8b01f54f8f515dae4a61ed8a8eb927c624f3 (patch)
tree916e0aadd51f3c57e92a75aa245cd68b768b726a /sql/ha_ndbcluster.cc
parent2dc00e6a6927022e21b824cc1da4e81792efeaa7 (diff)
downloadmariadb-git-65ab8b01f54f8f515dae4a61ed8a8eb927c624f3.tar.gz
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings - moved all errorcode mapping mysqld-ndb to ndberror.c - ndb util thread to discover all ndb tables at startup - ndb util thread to wait for mysqld startup
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc885
1 files changed, 673 insertions, 212 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 2340cc006f6..fc31f4854ab 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -46,6 +46,7 @@ static const int parallelism= 0;
static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
static const char *ha_ndb_ext=".ndb";
+static const char share_prefix[]= "./";
static int ndbcluster_close_connection(THD *thd);
static int ndbcluster_commit(THD *thd, bool all);
@@ -99,12 +100,15 @@ handlerton ndbcluster_hton = {
}
// Typedefs for long names
+typedef NdbDictionary::Object NDBOBJ;
typedef NdbDictionary::Column NDBCOL;
typedef NdbDictionary::Table NDBTAB;
typedef NdbDictionary::Index NDBINDEX;
typedef NdbDictionary::Dictionary NDBDICT;
+typedef NdbDictionary::Event NDBEVENT;
-bool ndbcluster_inited= FALSE;
+static int ndbcluster_inited= 0;
+static int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL;
static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
@@ -117,9 +121,12 @@ static HASH ndbcluster_open_tables;
static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)));
-static void ndb_set_fragmentation(NDBTAB & tab, TABLE *table, uint pk_len);
-static NDB_SHARE *get_share(const char *table_name);
-static void free_share(NDB_SHARE *share);
+static NDB_SHARE *get_share(const char *key,
+ bool create_if_not_exists= TRUE,
+ bool have_lock= FALSE);
+static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
+static void real_free_share(NDB_SHARE **share);
+static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
static int unpackfrm(const void **data, uint *len,
@@ -128,6 +135,33 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
+#ifndef DBUG_OFF
+void print_records(TABLE *table, const char *record)
+{
+ if (_db_on_)
+ {
+ for (uint j= 0; j < table->s->fields; j++)
+ {
+ char buf[40];
+ int pos= 0;
+ Field *field= table->field[j];
+ const byte* field_ptr= field->ptr - table->record[0] + record;
+ int pack_len= field->pack_length();
+ int n= pack_len < 10 ? pack_len : 10;
+
+ for (int i= 0; i < n && pos < 20; i++)
+ {
+ pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
+ }
+ buf[pos]= 0;
+ DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
+ }
+ }
+}
+#else
+#define print_records(a,b)
+#endif
+
// Util thread variables
static pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
@@ -179,65 +213,70 @@ struct show_var_st ndb_status_variables[]= {
{NullS, NullS, SHOW_LONG}
};
+/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
+extern Uint64 g_latest_trans_gci;
+
/*
Error handling functions
*/
-struct err_code_mapping
-{
- int ndb_err;
- int my_err;
- int show_warning;
-};
+/* Note for merge: old mapping table, moved to storage/ndb/ndberror.c */
-static const err_code_mapping err_map[]=
+static int ndb_to_mysql_error(const NdbError *ndberr)
{
- { 626, HA_ERR_KEY_NOT_FOUND, 0 },
- { 630, HA_ERR_FOUND_DUPP_KEY, 0 },
- { 893, HA_ERR_FOUND_DUPP_KEY, 0 },
- { 721, HA_ERR_TABLE_EXIST, 1 },
- { 4244, HA_ERR_TABLE_EXIST, 1 },
-
- { 709, HA_ERR_NO_SUCH_TABLE, 0 },
+ /* read the mysql mapped error code */
+ int error= ndberr->mysql_code;
- { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-
- { 623, HA_ERR_RECORD_FILE_FULL, 1 },
- { 624, HA_ERR_RECORD_FILE_FULL, 1 },
- { 625, HA_ERR_RECORD_FILE_FULL, 1 },
- { 826, HA_ERR_RECORD_FILE_FULL, 1 },
- { 827, HA_ERR_RECORD_FILE_FULL, 1 },
- { 832, HA_ERR_RECORD_FILE_FULL, 1 },
-
- { 284, HA_ERR_TABLE_DEF_CHANGED, 0 },
-
- { 0, 1, 0 },
-
- { -1, -1, 1 }
-};
+ switch (error)
+ {
+ /* errors for which we do not add warnings, just return mapped error code
+ */
+ case HA_ERR_NO_SUCH_TABLE:
+ case HA_ERR_KEY_NOT_FOUND:
+ case HA_ERR_FOUND_DUPP_KEY:
+ return error;
+
+ /* Mapping missing, go with the ndb error code*/
+ case -1:
+ error= ndberr->code;
+ break;
+ /* Mapping exists, go with the mapped code */
+ default:
+ break;
+ }
-static int ndb_to_mysql_error(const NdbError *err)
-{
- uint i;
- for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
- if (err_map[i].show_warning)
- {
- // Push the NDB error message as warning
+ /*
+ Push the NDB error message as warning
+ - Used to be able to use SHOW WARNINGS toget more info on what the error is
+ - Used by replication to see if the error was temporary
+ */
+ if (ndberr->status == NdbError::TemporaryError)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- err->code, err->message, "NDB");
- }
- if (err_map[i].my_err == -1)
- return err->code;
- return err_map[i].my_err;
+ ER_GET_TEMPORARY_ERRMSG, ER(ER_GET_TEMPORARY_ERRMSG),
+ ndberr->code, ndberr->message, "NDB");
+ else
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndberr->code, ndberr->message, "NDB");
+ return error;
}
+int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
+{
+ int res= trans->execute(NdbTransaction::NoCommit,
+ NdbTransaction::AO_IgnoreError,
+ h->m_force_send);
+ if (res == 0)
+ return 0;
+
+ const NdbError &err= trans->getNdbError();
+ if (err.classification != NdbError::ConstraintViolation &&
+ err.classification != NdbError::NoDataFound)
+ return res;
+ return 0;
+}
inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
@@ -247,9 +286,11 @@ int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
if (m_batch_execute)
return 0;
#endif
- return trans->execute(NdbTransaction::NoCommit,
- NdbTransaction::AbortOnError,
- h->m_force_send);
+ return h->m_ignore_no_key ?
+ execute_no_commit_ignore_no_key(h,trans) :
+ trans->execute(NdbTransaction::NoCommit,
+ NdbTransaction::AbortOnError,
+ h->m_force_send);
}
inline
@@ -437,29 +478,38 @@ void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
# The mapped error code
*/
-void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+void
+ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
+ const char *tabname, bool global)
{
- NDBDICT *dict= get_ndb()->getDictionary();
+ NDBDICT *dict= ndb->getDictionary();
DBUG_ENTER("invalidate_dictionary_cache");
- DBUG_PRINT("info", ("invalidating %s", m_tabname));
+ DBUG_PRINT("info", ("invalidating %s", tabname));
if (global)
{
- const NDBTAB *tab= dict->getTable(m_tabname);
+ const NDBTAB *tab= dict->getTable(tabname);
if (!tab)
DBUG_VOID_RETURN;
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
// Global cache has already been invalidated
- dict->removeCachedTable(m_tabname);
+ dict->removeCachedTable(tabname);
global= FALSE;
}
else
- dict->invalidateTable(m_tabname);
+ dict->invalidateTable(tabname);
}
else
- dict->removeCachedTable(m_tabname);
+ dict->removeCachedTable(tabname);
table->s->version=0L; /* Free when thread is ready */
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+{
+ NDBDICT *dict= get_ndb()->getDictionary();
+ invalidate_dictionary_cache(table, get_ndb(), m_tabname, global);
/* Invalidate indexes */
for (uint i= 0; i < table->s->keys; i++)
{
@@ -491,7 +541,6 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global)
break;
}
}
- DBUG_VOID_RETURN;
}
int ha_ndbcluster::ndb_err(NdbTransaction *trans)
@@ -648,14 +697,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
*/
int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr, bool *set_blob_value)
+ uint fieldnr, int row_offset,
+ bool *set_blob_value)
{
- const byte* field_ptr= field->ptr;
- uint32 pack_len= field->pack_length();
+ const byte* field_ptr= field->ptr + row_offset;
+ uint32 pack_len= field->pack_length();
DBUG_ENTER("set_ndb_value");
- DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
+ DBUG_PRINT("enter", ("%d: %s type: %u len=%d is_null=%s",
fieldnr, field->field_name, field->type(),
- pack_len, field->is_null()?"Y":"N"));
+ pack_len, field->is_null(row_offset) ? "Y" : "N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
DBUG_ASSERT(ndb_supported_type(field->type()));
@@ -666,7 +716,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
{
pack_len= sizeof(empty_field);
field_ptr= (byte *)&empty_field;
- if (field->is_null())
+ if (field->is_null(row_offset))
empty_field= 0;
else
empty_field= 1;
@@ -675,12 +725,15 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
{
if (field->type() != MYSQL_TYPE_BIT)
{
- if (field->is_null())
+ if (field->is_null(row_offset))
+ {
+ DBUG_PRINT("info", ("field is NULL"));
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr,
(char*)NULL, pack_len) != 0));
+ }
// Common implementation for most field types
- DBUG_RETURN(ndb_op->setValue(fieldnr,
+ DBUG_RETURN(ndb_op->setValue(fieldnr,
(char*)field_ptr, pack_len) != 0);
}
else // if (field->type() == MYSQL_TYPE_BIT)
@@ -690,7 +743,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
// Round up bit field length to nearest word boundry
pack_len= ((pack_len + 3) >> 2) << 2;
DBUG_ASSERT(pack_len <= 8);
- if (field->is_null())
+ if (field->is_null(row_offset))
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
DBUG_PRINT("info", ("bit field"));
@@ -709,7 +762,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL)
{
- if (field->is_null())
+ if (field->is_null(row_offset))
DBUG_RETURN(ndb_blob->setNull() != 0);
Field_blob *field_blob= (Field_blob*)field;
@@ -790,8 +843,8 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
{
char *buf= m_blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
- DBUG_PRINT("value", ("read blob ptr=%x len=%u",
- (UintPtr)buf, (uint)blob_len));
+ DBUG_PRINT("value", ("read blob ptr=%lx len=%u",
+ buf, (uint) blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
@@ -902,6 +955,19 @@ bool ha_ndbcluster::uses_blob_value()
of table accessed in NDB
*/
+static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
+ uint pack_length)
+{
+ DBUG_ENTER("cmp_frm");
+ /*
+ Compare FrmData in NDB with frm file from disk.
+ */
+ if ((pack_length != ndbtab->getFrmLength()) ||
+ (memcmp(pack_data, ndbtab->getFrmData(), pack_length)))
+ DBUG_RETURN(1);
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::get_metadata(const char *path)
{
Ndb *ndb= get_ndb();
@@ -939,8 +1005,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_RETURN(1);
}
- if ((pack_length != tab->getFrmLength()) ||
- (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ if (cmp_frm(tab, pack_data, pack_length))
{
if (!invalidating_ndb_table)
{
@@ -951,9 +1016,9 @@ int ha_ndbcluster::get_metadata(const char *path)
else
{
DBUG_PRINT("error",
- ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+ ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
pack_length, tab->getFrmLength(),
- memcmp(pack_data, tab->getFrmData(), pack_length)));
+ memcmp(pack_data, tab->getFrmData(), pack_length)));
DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
error= 3;
@@ -1980,7 +2045,7 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_ENTER("write_row");
- if (m_ignore_dup_key && table->s->primary_key != MAX_KEY)
+ if (!m_use_write && m_ignore_dup_key && table->s->primary_key != MAX_KEY)
{
int peek_res= peek_row(record);
@@ -2057,7 +2122,8 @@ int ha_ndbcluster::write_row(byte *record)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
- set_ndb_value(op, field, i, &set_blob_value))
+ (ha_get_bit_in_write_set(i + 1) || !m_use_write) &&
+ set_ndb_value(op, field, i, record-table->record[0], &set_blob_value))
{
m_skip_auto_increment= TRUE;
ERR_RETURN(op->getNdbError());
@@ -2299,7 +2365,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
Field *field= table->field[i];
if (ha_get_bit_in_write_set(i+1) &&
(!(field->flags & PRI_KEY_FLAG)) &&
- set_ndb_value(op, field, i))
+ set_ndb_value(op, field, i, new_data - table->record[0]))
ERR_RETURN(op->getNdbError());
}
@@ -2414,51 +2480,73 @@ int ha_ndbcluster::delete_row(const byte *record)
set to null.
*/
-void ha_ndbcluster::unpack_record(byte* buf)
+static void ndb_unpack_record(TABLE *table, NdbValue *value,
+ MY_BITMAP *defined, byte *buf)
{
+ Field **p_field= table->field, *field= *p_field;
uint row_offset= (uint) (buf - table->record[0]);
- Field **field, **end;
- NdbValue *value= m_value;
- DBUG_ENTER("unpack_record");
+ DBUG_ENTER("ndb_unpack_record");
- end= table->field + table->s->fields;
-
// Set null flag(s)
bzero(buf, table->s->null_bytes);
- for (field= table->field;
- field < end;
- field++, value++)
+ for ( ; field;
+ p_field++, value++, field= *p_field)
{
if ((*value).ptr)
{
- if (! ((*field)->flags & BLOB_FLAG))
+ if (!(field->flags & BLOB_FLAG))
{
- if ((*value).rec->isNULL())
- (*field)->set_null(row_offset);
- else if ((*field)->type() == MYSQL_TYPE_BIT)
+ int is_null= (*value).rec->isNULL();
+ if (is_null)
{
- uint pack_len= (*field)->pack_length();
- if (pack_len < 5)
+ if (is_null > 0)
+ {
+ DBUG_PRINT("info",("[%u] NULL",
+ (*value).rec->getColumn()->getColumnNo()));
+ field->set_null(row_offset);
+ }
+ else
+ {
+ DBUG_PRINT("info",("[%u] UNDEFINED",
+ (*value).rec->getColumn()->getColumnNo()));
+ bitmap_clear_bit(defined,
+ (*value).rec->getColumn()->getColumnNo());
+ }
+ }
+ else if (field->type() == MYSQL_TYPE_BIT)
+ {
+ byte *save_field_ptr= field->ptr;
+ field->ptr= save_field_ptr + row_offset;
+ if (field->pack_length() < 5)
{
DBUG_PRINT("info", ("bit field H'%.8X",
(*value).rec->u_32_value()));
- ((Field_bit *) *field)->store((longlong)
- (*value).rec->u_32_value(),
- FALSE);
+ ((Field_bit*) field)->store((longlong)
+ (*value).rec->u_32_value(), FALSE);
}
else
{
DBUG_PRINT("info", ("bit field H'%.8X%.8X",
- *(Uint32 *)(*value).rec->aRef(),
- *((Uint32 *)(*value).rec->aRef()+1)));
- ((Field_bit *) *field)->store((longlong)
- (*value).rec->u_64_value(), TRUE);
+ *(Uint32*) (*value).rec->aRef(),
+ *((Uint32*) (*value).rec->aRef()+1)));
+ ((Field_bit*) field)->store((longlong)
+ (*value).rec->u_64_value(),TRUE);
}
+ field->ptr= save_field_ptr;
+ DBUG_PRINT("info",("[%u] SET",
+ (*value).rec->getColumn()->getColumnNo()));
+ DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
+ }
+ else
+ {
+ DBUG_PRINT("info",("[%u] SET",
+ (*value).rec->getColumn()->getColumnNo()));
+ DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
}
}
else
{
- NdbBlob* ndb_blob= (*value).blob;
+ NdbBlob *ndb_blob= (*value).blob;
bool isNull= TRUE;
#ifndef DBUG_OFF
int ret=
@@ -2466,11 +2554,16 @@ void ha_ndbcluster::unpack_record(byte* buf)
ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
- (*field)->set_null(row_offset);
+ field->set_null(row_offset);
}
}
}
-
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::unpack_record(byte *buf)
+{
+ ndb_unpack_record(table, m_value, 0, buf);
#ifndef DBUG_OFF
// Read and print all values that was fetched
if (table->s->primary_key == MAX_KEY)
@@ -2486,7 +2579,6 @@ void ha_ndbcluster::unpack_record(byte* buf)
}
//print_results();
#endif
- DBUG_VOID_RETURN;
}
/*
@@ -3207,8 +3299,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
- DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d",
- thd, thd_ndb, thd_ndb->lock_count));
+ DBUG_PRINT("enter", ("this: %x thd: %lx thd_ndb: %lx "
+ "thd_ndb->lock_count: %d",
+ this, thd, thd_ndb, thd_ndb->lock_count));
if (lock_type != F_UNLCK)
{
@@ -3461,7 +3554,8 @@ int ndbcluster_commit(THD *thd, bool all)
while ((share= it++))
{
pthread_mutex_lock(&share->mutex);
- DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ", share->table_name, share->commit_count));
+ DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ",
+ share->key, share->commit_count));
share->commit_count= 0;
share->commit_count_lock++;
pthread_mutex_unlock(&share->mutex);
@@ -3790,6 +3884,10 @@ static int create_ndb_column(NDBCOL &col,
return 0;
}
+/*
+ Create a table in NDB Cluster
+*/
+
int ha_ndbcluster::create(const char *name,
TABLE *form,
HA_CREATE_INFO *info)
@@ -3815,7 +3913,8 @@ int ha_ndbcluster::create(const char *name,
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
- my_errno= write_ndb_file();
+ if ((my_errno= write_ndb_file()))
+ DBUG_RETURN(my_errno);
DBUG_RETURN(my_errno);
}
@@ -3829,7 +3928,7 @@ int ha_ndbcluster::create(const char *name,
if (packfrm(data, length, &pack_data, &pack_length))
DBUG_RETURN(2);
- DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
+ DBUG_PRINT("info", ("setFrm data=%lx len=%d", pack_data, pack_length));
tab.setFrm(pack_data, pack_length);
my_free((char*)data, MYF(0));
my_free((char*)pack_data, MYF(0));
@@ -4027,14 +4126,22 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
}
+
m_table= (void *)orig_tab;
// Change current database to that of target table
set_dbname(to);
ndb->setDatabaseName(m_dbname);
- if (!(result= alter_table_name(new_tabname)))
+
+ if ((result= alter_table_name(new_tabname)))
+ {
+ DBUG_RETURN(result);
+ }
+
+ // Rename .ndb file
+ if ((result= handler::rename_table(from, to)))
{
- // Rename .ndb file
- result= handler::rename_table(from, to);
+ // ToDo in 4.1 should rollback alter table...
+ DBUG_RETURN(result);
}
DBUG_RETURN(result);
@@ -4050,7 +4157,7 @@ int ha_ndbcluster::alter_table_name(const char *to)
Ndb *ndb= get_ndb();
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *orig_tab= (const NDBTAB *) m_table;
- DBUG_ENTER("alter_table_name_table");
+ DBUG_ENTER("alter_table_name");
NdbDictionary::Table new_tab= *orig_tab;
new_tab.setName(to);
@@ -4069,6 +4176,38 @@ int ha_ndbcluster::alter_table_name(const char *to)
*/
+/* static version which does not need a handler */
+
+int
+ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name)
+{
+ DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+ NDBDICT *dict= ndb->getDictionary();
+
+ /* Drop the table from NDB */
+
+ int res;
+ if (h)
+ {
+ res= h->drop_table();
+ }
+ else
+ {
+ ndb->setDatabaseName(db);
+ res= dict->dropTable(table_name);
+ }
+
+ if (res)
+ {
+ DBUG_RETURN(res);
+ }
+
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::delete_table(const char *name)
{
DBUG_ENTER("ha_ndbcluster::delete_table");
@@ -4081,9 +4220,8 @@ int ha_ndbcluster::delete_table(const char *name)
/* Call ancestor function to delete .ndb file */
handler::delete_table(name);
-
- /* Drop the table from NDB */
- DBUG_RETURN(drop_table());
+
+ DBUG_RETURN(delete_table(this, get_ndb(),name, m_dbname, m_tabname));
}
@@ -4149,6 +4287,15 @@ ulonglong ha_ndbcluster::get_auto_increment()
Constructor for the NDB Cluster table handler
*/
+#define HA_NDBCLUSTER_TABLE_FLAGS \
+ HA_REC_NOT_IN_SEQ | \
+ HA_NULL_IN_KEY | \
+ HA_AUTO_PART_KEY | \
+ HA_NO_PREFIX_CHAR_KEYS | \
+ HA_NEED_READ_RANGE_BUFFER | \
+ HA_CAN_GEOMETRY | \
+ HA_CAN_BIT_FIELD
+
ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
handler(&ndbcluster_hton, table_arg),
m_active_trans(NULL),
@@ -4156,13 +4303,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table(NULL),
m_table_version(-1),
m_table_info(NULL),
- m_table_flags(HA_REC_NOT_IN_SEQ |
- HA_NULL_IN_KEY |
- HA_AUTO_PART_KEY |
- HA_NO_PREFIX_CHAR_KEYS |
- HA_NEED_READ_RANGE_BUFFER |
- HA_CAN_GEOMETRY |
- HA_CAN_BIT_FIELD),
+ m_table_flags(HA_NDBCLUSTER_TABLE_FLAGS),
m_share(0),
m_part_info(NULL),
m_use_partition_function(FALSE),
@@ -4170,6 +4311,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_use_write(FALSE),
m_ignore_dup_key(FALSE),
m_primary_key_update(FALSE),
+ m_ignore_no_key(FALSE),
m_rows_to_insert((ha_rows) 1),
m_rows_inserted((ha_rows) 0),
m_bulk_insert_rows((ha_rows) 1024),
@@ -4223,7 +4365,9 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster");
if (m_share)
- free_share(m_share);
+ {
+ free_share(&m_share);
+ }
release_metadata();
my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
m_blobs_buffer= 0;
@@ -4256,8 +4400,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int res;
KEY *key;
DBUG_ENTER("open");
- DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
- name, mode, test_if_locked));
+ DBUG_PRINT("enter", ("this: %d name: %s mode: %d test_if_locked: %d",
+ this, name, mode, test_if_locked));
// Setup ref_length to make room for the whole
// primary key to be written in the ref variable
@@ -4277,7 +4421,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
set_tabname(name);
if (check_ndb_connection()) {
- free_share(m_share); m_share= 0;
+ free_share(&m_share);
+ m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
@@ -4306,7 +4451,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int ha_ndbcluster::close(void)
{
DBUG_ENTER("close");
- free_share(m_share); m_share= 0;
+ free_share(&m_share);
+ m_share= 0;
release_metadata();
DBUG_RETURN(0);
}
@@ -4509,21 +4655,24 @@ int ndbcluster_drop_database(const char *path)
ERR_RETURN(dict->getNdbError());
for (i= 0 ; i < list.count ; i++)
{
- NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+ NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
- if (my_strcasecmp(system_charset_info, t.database, dbname))
+ if (my_strcasecmp(system_charset_info, elmt.database, dbname))
continue;
- DBUG_PRINT("info", ("%s must be dropped", t.name));
- drop_list.push_back(thd->strdup(t.name));
+ DBUG_PRINT("info", ("%s must be dropped", elmt.name));
+ drop_list.push_back(thd->strdup(elmt.name));
}
// Drop any tables belonging to database
+ char full_path[FN_REFLEN];
+ char *tmp= strxnmov(full_path, FN_REFLEN, share_prefix, dbname, "/", NullS);
ndb->setDatabaseName(dbname);
List_iterator_fast<char> it(drop_list);
while ((tabname=it++))
{
- if (dict->dropTable(tabname))
+ strxnmov(tmp, FN_REFLEN - (tmp - full_path), tabname, NullS);
+ if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
{
const NdbError err= dict->getNdbError();
if (err.code != 709)
@@ -4536,6 +4685,92 @@ int ndbcluster_drop_database(const char *path)
DBUG_RETURN(ret);
}
+/*
+ find all tables in ndb and discover those needed
+*/
+static int ndbcluster_find_all_files(THD *thd)
+{
+ DBUG_ENTER("ndbcluster_find_all_files");
+ Ndb* ndb;
+ char key[FN_REFLEN];
+ NdbDictionary::Dictionary::List list;
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ NDBDICT *dict= ndb->getDictionary();
+
+ int unhandled, retries= 5;
+ do
+ {
+ if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+ ERR_RETURN(dict->getNdbError());
+ unhandled= 0;
+ for (uint i= 0 ; i < list.count ; i++)
+ {
+ NDBDICT::List::Element& elmt= list.elements[i];
+ DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
+ if (!(elmt.state == NDBOBJ::StateBuilding ||
+ elmt.state == NDBOBJ::StateOnline))
+ {
+ sql_print_information("NDB: skipping setup table %s.%s, in state %d",
+ elmt.database, elmt.name, elmt.state);
+ continue;
+ }
+
+ ndb->setDatabaseName(elmt.database);
+ const NDBTAB *ndbtab;
+
+ if (!(ndbtab= dict->getTable(elmt.name)))
+ {
+ sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+ elmt.database, elmt.name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ unhandled++;
+ continue;
+ }
+
+ if (ndbtab->getFrmLength() == 0)
+ continue;
+
+ strxnmov(key, FN_LEN, mysql_data_home, "/",
+ elmt.database, "/", elmt.name, NullS);
+ const void *data= 0, *pack_data= 0;
+ uint length, pack_length;
+ int discover= 0;
+ if (readfrm(key, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: missing frm for %s.%s, discovering...",
+ elmt.database, elmt.name);
+ }
+ else if (cmp_frm(ndbtab, pack_data, pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
+ elmt.database, elmt.name);
+ }
+ my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+ if (discover)
+ {
+ /* ToDo 4.1 database needs to be created if missing */
+ pthread_mutex_lock(&LOCK_open);
+ if (ha_create_table_from_engine(thd, elmt.database, elmt.name))
+ {
+ /* ToDo 4.1 handle error */
+ }
+ pthread_mutex_unlock(&LOCK_open);
+ }
+ }
+ }
+ while (unhandled && retries--);
+
+ DBUG_RETURN(0);
+}
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files)
@@ -4547,7 +4782,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
Ndb* ndb;
char name[FN_REFLEN];
HASH ndb_tables, ok_tables;
- NdbDictionary::Dictionary::List list;
+ NDBDICT::List list;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -4578,11 +4813,11 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
for (i= 0 ; i < list.count ; i++)
{
- NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+ NDBDICT::List::Element& elmt= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
- if (my_strcasecmp(system_charset_info, t.database, db))
+ if (my_strcasecmp(system_charset_info, elmt.database, db))
continue;
// Apply wildcard to list of tables in NDB
@@ -4590,14 +4825,14 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
{
if (lower_case_table_names)
{
- if (wild_case_compare(files_charset_info, t.name, wild))
+ if (wild_case_compare(files_charset_info, elmt.name, wild))
continue;
}
- else if (wild_compare(t.name,wild,0))
+ else if (wild_compare(elmt.name,wild,0))
continue;
}
- DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));
- my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
+ DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", elmt.name));
+ my_hash_insert(&ndb_tables, (byte*)thd->strdup(elmt.name));
}
char *file_name;
@@ -4645,10 +4880,15 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
file_name= hash_element(&ndb_tables, i);
if (!hash_search(&ok_tables, file_name, strlen(file_name)))
{
- DBUG_PRINT("info", ("%s must be discovered", file_name));
- // File is in list of ndb tables and not in ok_tables
- // This table need to be created
- create_list.push_back(thd->strdup(file_name));
+ strxnmov(name, sizeof(name),
+ mysql_data_home, "/", db, "/", file_name, reg_ext, NullS);
+ if (access(name, F_OK))
+ {
+ DBUG_PRINT("info", ("%s must be discovered", file_name));
+ // File is in list of ndb tables and not in ok_tables
+ // This table need to be created
+ create_list.push_back(thd->strdup(file_name));
+ }
}
}
@@ -4704,6 +4944,7 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
+ pthread_cond_signal(&COND_ndb_util_thread);
return 0;
}
@@ -4778,6 +5019,7 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -4840,6 +5082,7 @@ bool ndbcluster_end()
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
+ ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
@@ -5112,7 +5355,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
char name[FN_REFLEN];
NDB_SHARE *share;
- (void)strxnmov(name, FN_REFLEN, "./",dbname,"/",tabname,NullS);
+ (void)strxnmov(name, FN_REFLEN, share_prefix, dbname, "/", tabname, NullS);
DBUG_PRINT("enter", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
@@ -5120,8 +5363,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
strlen(name))))
{
pthread_mutex_unlock(&ndbcluster_mutex);
- DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables",
- name));
+ DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name));
DBUG_RETURN(1);
}
share->use_count++;
@@ -5136,7 +5378,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
DBUG_PRINT("info", ("Getting commit_count: %llu from share",
share->commit_count));
pthread_mutex_unlock(&share->mutex);
- free_share(share);
+ free_share(&share);
DBUG_RETURN(0);
}
}
@@ -5151,7 +5393,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
struct Ndb_statistics stat;
if (ndb_get_table_statistics(ndb, tabname, &stat))
{
- free_share(share);
+ free_share(&share);
DBUG_RETURN(1);
}
@@ -5168,7 +5410,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
*commit_count= 0;
}
pthread_mutex_unlock(&share->mutex);
- free_share(share);
+ free_share(&share);
DBUG_RETURN(0);
}
@@ -5305,6 +5547,60 @@ ha_ndbcluster::register_query_cache_table(THD *thd,
}
+#ifndef DBUG_OFF
+static void dbug_print_table(const char *info, TABLE *table)
+{
+ if (table == 0)
+ {
+ DBUG_PRINT("info",("%s: (null)", info));
+ return;
+ }
+ DBUG_PRINT("info",
+ ("%s: %s.%s s->fields: %d "
+ "reclength: %d rec_buff_length: %d record[0]: %lx "
+ "record[1]: %lx",
+ info,
+ table->s->db,
+ table->s->table_name,
+ table->s->fields,
+ table->s->reclength,
+ table->s->rec_buff_length,
+ table->record[0],
+ table->record[1]));
+
+ for (unsigned int i= 0; i < table->s->fields; i++)
+ {
+ Field *f= table->field[i];
+ DBUG_PRINT("info",
+ ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
+ "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
+ i,
+ f->field_name,
+ f->flags,
+ (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
+ (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
+ (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
+ (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
+ (f->flags & BLOB_FLAG) ? ",blob" : "",
+ (f->flags & BINARY_FLAG) ? ",binary" : "",
+ f->real_type(),
+ f->pack_length(),
+ f->ptr, f->ptr - table->record[0],
+ f->null_bit,
+ f->null_ptr, (byte*) f->null_ptr - table->record[0]));
+ if (f->type() == MYSQL_TYPE_BIT)
+ {
+ Field_bit *g= (Field_bit*) f;
+ DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
+ "bit_ofs: %u bit_len: %u",
+ g->field_length, g->bit_ptr,
+ (byte*) g->bit_ptr-table->record[0],
+ g->bit_ofs, g->bit_len));
+ }
+ }
+}
+#endif
+
/*
Handling the shared NDB_SHARE structure that is needed to
provide table locking.
@@ -5313,68 +5609,195 @@ ha_ndbcluster::register_query_cache_table(THD *thd,
data we want to or can share.
*/
-static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
+static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)))
{
- *length=share->table_name_length;
- return (byte*) share->table_name;
+ *length= share->key_length;
+ return (byte*) share->key;
}
-static NDB_SHARE* get_share(const char *table_name)
+#ifndef DBUG_OFF
+static void dbug_print_open_tables()
+{
+ DBUG_ENTER("dbug_print_open_tables");
+ for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+ {
+ NDB_SHARE *share= (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
+ DBUG_PRINT("share",
+ ("[%d] 0x%lx key: %s key_length: %d",
+ i, share, share->key, share->key_length));
+ DBUG_PRINT("share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ }
+ DBUG_VOID_RETURN;
+}
+#else
+#define dbug_print_open_tables()
+#endif
+
+/*
+ Increase refcount on existing share.
+ Always returns share and cannot fail.
+*/
+static NDB_SHARE *get_share(NDB_SHARE *share)
{
- NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
- uint length=(uint) strlen(table_name);
- if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
- (byte*) table_name,
- length)))
+ share->use_count++;
+
+ dbug_print_open_tables();
+
+ DBUG_PRINT("get_share",
+ ("0x%lx key: %s key_length: %d",
+ share, share->key, share->key_length));
+ DBUG_PRINT("get_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return share;
+}
+
+/*
+ Get a share object for key
+
+ Returns share for key, and increases the refcount on the share.
+
+ create_if_not_exists == TRUE:
+ creates share if it does not alreade exist
+ returns 0 only due to out of memory, and then sets my_error
+
+ create_if_not_exists == FALSE:
+ returns 0 if share does not exist
+
+ have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
+*/
+static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
+ bool have_lock)
+{
+ NDB_SHARE *share;
+ if (!have_lock)
+ pthread_mutex_lock(&ndbcluster_mutex);
+ uint length= (uint) strlen(key);
+ if (!(share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) key,
+ length)))
{
- if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+ if (!create_if_not_exists)
+ {
+ DBUG_PRINT("error", ("get_share: %s does not exist", key));
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return 0;
+ }
+ if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
MYF(MY_WME | MY_ZEROFILL))))
{
- share->table_name_length=length;
- share->table_name=(char*) (share+1);
- strmov(share->table_name,table_name);
+ MEM_ROOT **root_ptr=
+ my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+ MEM_ROOT *old_root= *root_ptr;
+ init_sql_alloc(&share->mem_root, 1024, 0);
+ *root_ptr= &share->mem_root; // remember to reset before return
+
+ /* enough space for key, db, and table_name */
+ share->key= alloc_root(*root_ptr, 2 * (length + 1));
+ share->key_length= length;
+ strmov(share->key, key);
if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
{
- pthread_mutex_unlock(&ndbcluster_mutex);
- my_free((gptr) share,0);
+ free_root(&share->mem_root, MYF(0));
+ my_free((gptr) share, 0);
+ *root_ptr= old_root;
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
return 0;
}
thr_lock_init(&share->lock);
- pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
share->commit_count= 0;
share->commit_count_lock= 0;
+ share->db= share->key + length + 1;
+ ha_ndbcluster::set_dbname(key, share->db);
+ share->table_name= share->db + strlen(share->db) + 1;
+ ha_ndbcluster::set_tabname(key, share->table_name);
+ *root_ptr= old_root;
}
else
{
- DBUG_PRINT("error", ("Failed to alloc share"));
- pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_PRINT("error", ("get_share: failed to alloc share"));
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*share));
return 0;
}
}
share->use_count++;
- DBUG_PRINT("share",
- ("table_name: %s, length: %d, use_count: %d, commit_count: %d",
- share->table_name, share->table_name_length, share->use_count,
- share->commit_count));
- pthread_mutex_unlock(&ndbcluster_mutex);
+ dbug_print_open_tables();
+
+ DBUG_PRINT("get_share",
+ ("0x%lx key: %s key_length: %d key: %s",
+ share, share->key, share->key_length, key));
+ DBUG_PRINT("get_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
return share;
}
+static void real_free_share(NDB_SHARE **share)
+{
+ DBUG_PRINT("real_free_share",
+ ("0x%lx key: %s key_length: %d",
+ (*share), (*share)->key, (*share)->key_length));
+ DBUG_PRINT("real_free_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ (*share)->db, (*share)->table_name,
+ (*share)->use_count, (*share)->commit_count));
+
+ hash_delete(&ndbcluster_open_tables, (byte*) *share);
+ thr_lock_delete(&(*share)->lock);
+ pthread_mutex_destroy(&(*share)->mutex);
+ free_root(&(*share)->mem_root, MYF(0));
+
+ my_free((gptr) *share, MYF(0));
+ *share= 0;
+
+ dbug_print_open_tables();
+}
+
+/*
+ decrease refcount of share
+ calls real_free_share when refcount reaches 0
-static void free_share(NDB_SHARE *share)
+ have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
+*/
+static void free_share(NDB_SHARE **share, bool have_lock)
{
- pthread_mutex_lock(&ndbcluster_mutex);
- if (!--share->use_count)
+ if (!have_lock)
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if ((*share)->util_lock == current_thd)
+ (*share)->util_lock= 0;
+ if (!--(*share)->use_count)
{
- hash_delete(&ndbcluster_open_tables, (byte*) share);
- thr_lock_delete(&share->lock);
- pthread_mutex_destroy(&share->mutex);
- my_free((gptr) share, MYF(0));
+ real_free_share(share);
}
- pthread_mutex_unlock(&ndbcluster_mutex);
+ else
+ {
+ dbug_print_open_tables();
+ DBUG_PRINT("free_share",
+ ("0x%lx key: %s key_length: %d",
+ *share, (*share)->key, (*share)->key_length));
+ DBUG_PRINT("free_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ (*share)->db, (*share)->table_name,
+ (*share)->use_count, (*share)->commit_count));
+ }
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
}
@@ -5405,14 +5828,14 @@ static int packfrm(const void *data, uint len,
uint blob_len;
frm_blob_struct* blob;
DBUG_ENTER("packfrm");
- DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
+ DBUG_PRINT("enter", ("data: 0x%lx len: %d", data, len));
error= 1;
org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len))
goto err;
- DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
+ DBUG_PRINT("info", ("org_len: %d comp_len: %d", org_len, comp_len));
DBUG_DUMP("compressed", (char*)data, org_len);
error= 2;
@@ -5432,7 +5855,8 @@ static int packfrm(const void *data, uint len,
*pack_len= blob_len;
error= 0;
- DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
+ DBUG_PRINT("exit",
+ ("pack_data: 0x%lx pack_len: %d", *pack_data, *pack_len));
err:
DBUG_RETURN(error);
@@ -5446,7 +5870,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
byte *data;
ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm");
- DBUG_PRINT("enter", ("pack_data: %x", pack_data));
+ DBUG_PRINT("enter", ("pack_data: 0x%lx", pack_data));
complen= uint4korr((char*)&blob->head.complen);
orglen= uint4korr((char*)&blob->head.orglen);
@@ -5471,7 +5895,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
*unpack_data= data;
*unpack_len= complen;
- DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
+ DBUG_PRINT("exit", ("frmdata: 0x%lx, len: %d", *unpack_data, *unpack_len));
DBUG_RETURN(0);
}
@@ -5484,11 +5908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
NdbTransaction* pTrans= ndb->startTransaction();
+ if (pTrans == NULL)
+ ERR_RETURN(ndb->getNdbError());
do
{
- if (pTrans == NULL)
- break;
-
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
@@ -6010,6 +6433,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
THD *thd; /* needs to be first for thread_stack */
Ndb* ndb;
struct timespec abstime;
+ List<NDB_SHARE> util_open_tables;
my_thread_init();
DBUG_ENTER("ndb_util_thread");
@@ -6030,10 +6454,51 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
delete ndb;
DBUG_RETURN(NULL);
}
+ thd->init_for_queries();
+ thd->version=refresh_version;
+ thd->set_time();
+ thd->main_security_ctx.host_or_ip= "";
+ thd->client_capabilities = 0;
+ my_net_init(&thd->net, 0);
+ thd->main_security_ctx.master_access= ~0;
+ thd->main_security_ctx.priv_user = 0;
+
+ /*
+ wait for mysql server to start
+ */
+ pthread_mutex_lock(&LOCK_server_started);
+ while (!mysqld_server_started)
+ pthread_cond_wait(&COND_server_started, &LOCK_server_started);
+ pthread_mutex_unlock(&LOCK_server_started);
+
+ /*
+ Wait for cluster to start
+ */
+ pthread_mutex_lock(&LOCK_ndb_util_thread);
+ while (!ndb_cluster_node_id)
+ {
+ /* ndb not connected yet */
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&COND_ndb_util_thread,
+ &LOCK_ndb_util_thread,
+ &abstime);
+ if (abort_loop)
+ {
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ goto ndb_util_thread_end;
+ }
+ }
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+ /*
+ Get all table definitions from the storage node
+ */
+ ndbcluster_find_all_files(thd);
+
+ ndbcluster_util_inited= 1;
- List<NDB_SHARE> util_open_tables;
set_timespec(abstime, 0);
- for (;;)
+ for (;!abort_loop;)
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
@@ -6041,10 +6506,10 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
&LOCK_ndb_util_thread,
&abstime);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
+#ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
ndb_cache_check_time));
-
+#endif
if (abort_loop)
break; /* Shutting down server */
@@ -6075,20 +6540,12 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
List_iterator_fast<NDB_SHARE> it(util_open_tables);
while ((share= it++))
{
- /* Split tab- and dbname */
- char buf[FN_REFLEN];
- char *tabname, *db;
- uint length= dirname_length(share->table_name);
- tabname= share->table_name+length;
- memcpy(buf, share->table_name, length-1);
- buf[length-1]= 0;
- db= buf+dirname_length(buf);
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s",
- share->table_name));
+ share->key));
/* Contact NDB to get commit count for table */
- ndb->setDatabaseName(db);
+ ndb->setDatabaseName(share->db);
struct Ndb_statistics stat;
uint lock;
@@ -6096,17 +6553,17 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
lock= share->commit_count_lock;
pthread_mutex_unlock(&share->mutex);
- if (ndb_get_table_statistics(ndb, tabname, &stat) == 0)
+ if (ndb_get_table_statistics(ndb, share->table_name, &stat) == 0)
{
DBUG_PRINT("ndb_util_thread",
("Table: %s, commit_count: %llu, rows: %llu",
- share->table_name, stat.commit_count, stat.row_count));
+ share->key, stat.commit_count, stat.row_count));
}
else
{
DBUG_PRINT("ndb_util_thread",
("Error: Could not get commit count for table %s",
- share->table_name));
+ share->key));
stat.commit_count= 0;
}
@@ -6116,7 +6573,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
pthread_mutex_unlock(&share->mutex);
/* Decrease the use count and possibly free share */
- free_share(share);
+ free_share(&share);
}
/* Clear the list of open tables */
@@ -6143,7 +6600,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
abstime.tv_nsec-= 1000000000;
}
}
-
+ndb_util_thread_end:
+ net_end(&thd->net);
thd->cleanup();
delete thd;
delete ndb;
@@ -7480,6 +7938,9 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
DBUG_RETURN(0);
}
+/*
+ Implements the SHOW NDB STATUS command.
+*/
int
ndbcluster_show_status(THD* thd)
{