summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.ndb.mysql.com>2004-10-03 21:39:04 +0000
committerunknown <tomas@poseidon.ndb.mysql.com>2004-10-03 21:39:04 +0000
commit6b1895b86400e8531dd99b89421bc44984568ed8 (patch)
treeaba1618cac773466e58a7ff45f66bb205fe52130 /sql/ha_ndbcluster.cc
parent3a15d0884d0b9a248d2f3b0f66a05c35b942badc (diff)
parent25bcfeef6093dfdfcd84323dc1ed4723ea71f7fc (diff)
downloadmariadb-git-6b1895b86400e8531dd99b89421bc44984568ed8.tar.gz
Merge
BitKeeper/etc/logging_ok: auto-union sql/ha_ndbcluster.h: Auto merged sql/handler.cc: Auto merged sql/handler.h: Auto merged sql/mysql_priv.h: Auto merged sql/sql_base.cc: Auto merged sql/sql_show.cc: Auto merged sql/sql_table.cc: Auto merged mysql-test/mysql-test-run.sh: SCCS merged sql/ha_ndbcluster.cc: SCCS merged
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc512
1 files changed, 341 insertions, 171 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index e48ac8cc75e..d7a580dbb88 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -32,9 +32,6 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp>
-#define USE_DISCOVER_ON_STARTUP
-//#define USE_NDB_POOL
-
// Default value for parallelism
static const int parallelism= 240;
@@ -48,11 +45,13 @@ static const ha_rows autoincrement_prefetch= 32;
// connectstring to cluster if given by mysqld
const char *ndbcluster_connectstring= 0;
+static const char *ha_ndb_ext=".ndb";
+
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
#define ERR_PRINT(err) \
- DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
+ DBUG_PRINT("error", ("%d message: %s", err.code, err.message))
#define ERR_RETURN(err) \
{ \
@@ -107,7 +106,9 @@ static const err_code_mapping err_map[]=
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
- { 241, HA_ERR_OLD_METADATA },
+
+ { 709, HA_ERR_NO_SUCH_TABLE },
+ { 284, HA_ERR_NO_SUCH_TABLE },
{ 266, HA_ERR_LOCK_WAIT_TIMEOUT },
{ 274, HA_ERR_LOCK_WAIT_TIMEOUT },
@@ -147,7 +148,25 @@ int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans)
int m_batch_execute= 0;
if (false && m_batch_execute)
return 0;
- return trans->execute(NoCommit);
+ return trans->execute(NoCommit,AbortOnError,1);
+}
+
+inline
+int execute_commit(ha_ndbcluster *h, NdbConnection *trans)
+{
+ int m_batch_execute= 0;
+ if (false && m_batch_execute)
+ return 0;
+ return trans->execute(Commit,AbortOnError,1);
+}
+
+inline
+int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans)
+{
+ int m_batch_execute= 0;
+ if (false && m_batch_execute)
+ return 0;
+ return trans->execute(NoCommit,IgnoreError,1);
}
/*
@@ -178,19 +197,9 @@ struct Ndb_table_local_info {
ha_rows records;
};
-void ha_ndbcluster::set_rec_per_key()
-{
- DBUG_ENTER("ha_ndbcluster::get_status_const");
- for (uint i=0 ; i < table->keys ; i++)
- {
- table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
- }
- DBUG_VOID_RETURN;
-}
-
void ha_ndbcluster::records_update()
{
- DBUG_ENTER("ha_ndbcluster::get_status_variable");
+ DBUG_ENTER("ha_ndbcluster::records_update");
struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
((const NDBTAB *)m_table)->getTableId(),
@@ -278,6 +287,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
NDBDICT *dict= m_ndb->getDictionary();
DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
dict->invalidateTable(m_tabname);
+ table->version=0L; /* Free when thread is ready */
break;
}
default:
@@ -318,7 +328,8 @@ bool ha_ndbcluster::get_error_message(int error,
/*
Check if type is supported by NDB.
- TODO Use this once, not in every operation
+ TODO Use this once in open(), not in every operation
+
*/
static inline bool ndb_supported_type(enum_field_types type)
@@ -665,7 +676,7 @@ int ha_ndbcluster::get_metadata(const char *path)
memcmp(pack_data, tab->getFrmData(), pack_length)));
DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
- error= HA_ERR_OLD_METADATA;
+ error= 3;
invalidating_ndb_table= false;
}
}
@@ -689,11 +700,11 @@ int ha_ndbcluster::get_metadata(const char *path)
int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
{
+ uint i;
int error= 0;
- char *name;
- const char *index_name;
+ const char *name, *index_name;
+ char unique_index_name[FN_LEN];
static const char* unique_suffix= "$unique";
- uint i, name_len;
KEY* key_info= tab->key_info;
const char **key_name= tab->keynames.type_names;
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
@@ -707,21 +718,15 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
m_index[i].type= idx_type;
if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{
- name_len= strlen(index_name)+strlen(unique_suffix)+1;
- // Create name for unique index by appending "$unique";
- if (!(name= my_malloc(name_len, MYF(MY_WME))))
- DBUG_RETURN(2);
- strxnmov(name, name_len, index_name, unique_suffix, NullS);
- m_index[i].unique_name= name;
- DBUG_PRINT("info", ("Created unique index name: %s for index %d",
- name, i));
+ strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
+ DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d",
+ unique_index_name, i));
}
// Create secondary indexes if in create phase
if (phase == ILBP_CREATE)
{
- DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
-
- switch (m_index[i].type){
+ DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
+ switch (idx_type){
case PRIMARY_KEY_INDEX:
// Do nothing, already created
@@ -731,10 +736,10 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
break;
case UNIQUE_ORDERED_INDEX:
if (!(error= create_ordered_index(index_name, key_info)))
- error= create_unique_index(get_unique_index_name(i), key_info);
+ error= create_unique_index(unique_index_name, key_info);
break;
case UNIQUE_INDEX:
- error= create_unique_index(get_unique_index_name(i), key_info);
+ error= create_unique_index(unique_index_name, key_info);
break;
case ORDERED_INDEX:
error= create_ordered_index(index_name, key_info);
@@ -751,21 +756,20 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
}
}
// Add handles to index objects
- DBUG_PRINT("info", ("Trying to add handle to index %s", index_name));
- if ((m_index[i].type != PRIMARY_KEY_INDEX) &&
- (m_index[i].type != UNIQUE_INDEX))
+ if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
{
+ DBUG_PRINT("info", ("Get handle to index %s", index_name));
const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].index= (void *) index;
}
- if (m_index[i].unique_name)
+ if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{
- const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname);
+ DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
+ const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].unique_index= (void *) index;
}
- DBUG_PRINT("info", ("Added handle to index %s", index_name));
}
DBUG_RETURN(error);
@@ -801,9 +805,6 @@ void ha_ndbcluster::release_metadata()
// Release index list
for (i= 0; i < MAX_KEY; i++)
{
- if (m_index[i].unique_name)
- my_free((char*)m_index[i].unique_name, MYF(0));
- m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
}
@@ -813,17 +814,15 @@ void ha_ndbcluster::release_metadata()
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
- int lm;
if (type == TL_WRITE_ALLOW_WRITE)
- lm= NdbOperation::LM_Exclusive;
+ return NdbOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
/*
TODO use a new scan mode to read + lock + keyinfo
*/
- lm= NdbOperation::LM_Exclusive;
+ return NdbOperation::LM_Exclusive;
else
- lm= NdbOperation::LM_CommittedRead;
- return lm;
+ return NdbOperation::LM_CommittedRead;
}
static const ulong index_type_flags[]=
@@ -861,16 +860,6 @@ static const ulong index_type_flags[]=
static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
-inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
-{
- return table->keynames.type_names[idx_no];
-}
-
-inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
-{
- return m_index[idx_no].unique_name;
-}
-
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
@@ -1006,7 +995,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
}
}
- if (trans->execute(NoCommit, IgnoreError) != 0)
+ if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -1087,7 +1076,6 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_ENTER("unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len);
- DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
@@ -1127,7 +1115,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
}
}
- if (trans->execute(NoCommit, IgnoreError) != 0)
+ if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -1204,7 +1192,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
}
else
{
- if (ops_pending && (trans->execute(Commit) != 0))
+ if (ops_pending && (execute_commit(this,trans) != 0))
DBUG_RETURN(ndb_err(trans));
trans->restart();
}
@@ -1343,7 +1331,6 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbIndexScanOperation *op;
- const char *index_name;
DBUG_ENTER("ordered_index_scan");
DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
@@ -1352,7 +1339,6 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_EXECUTE("enter", print_key(start_key, "start_key"););
DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
- index_name= get_index_name(active_index);
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
@@ -1561,8 +1547,8 @@ int ha_ndbcluster::write_row(byte *record)
}
statistic_increment(ha_write_count,&LOCK_status);
- if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
- table->timestamp_field->set_time();
+ if (table->timestamp_default_now)
+ update_timestamp(record+table->timestamp_default_now-1);
has_auto_increment= (table->next_number_field && record == table->record[0]);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
@@ -1639,7 +1625,7 @@ int ha_ndbcluster::write_row(byte *record)
}
else
{
- if (trans->execute(Commit) != 0)
+ if (execute_commit(this,trans) != 0)
{
skip_auto_increment= true;
no_uncommitted_rows_execute_failure();
@@ -1712,9 +1698,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
DBUG_ENTER("update_row");
statistic_increment(ha_update_count,&LOCK_status);
- if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
- table->timestamp_field->set_time();
-
+ if (table->timestamp_on_update_now)
+ update_timestamp(new_data+table->timestamp_on_update_now-1);
+
/* Check for update of primary key for special handling */
if ((table->primary_key != MAX_KEY) &&
(key_cmp(table->primary_key, old_data, new_data)))
@@ -2419,6 +2405,8 @@ void ha_ndbcluster::info(uint flag)
DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
if (flag & HA_STATUS_TIME)
DBUG_PRINT("info", ("HA_STATUS_TIME"));
+ if (flag & HA_STATUS_CONST)
+ DBUG_PRINT("info", ("HA_STATUS_CONST"));
if (flag & HA_STATUS_VARIABLE)
{
DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
@@ -2434,11 +2422,6 @@ void ha_ndbcluster::info(uint flag)
}
}
}
- if (flag & HA_STATUS_CONST)
- {
- DBUG_PRINT("info", ("HA_STATUS_CONST"));
- set_rec_per_key();
- }
if (flag & HA_STATUS_ERRKEY)
{
DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
@@ -2660,7 +2643,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const
-{ static const char *ext[1]= { NullS }; return ext; }
+{ static const char *ext[]= { ".ndb", NullS }; return ext; }
/*
@@ -2684,14 +2667,13 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
enum thr_lock_type lock_type)
{
DBUG_ENTER("store_lock");
-
if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK)
{
-
+
/* If we are not doing a LOCK TABLE, then allow multiple
writers */
- if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+ if ((lock_type >= TL_WRITE_ALLOW_WRITE &&
lock_type <= TL_WRITE) && !thd->in_lock_tables)
lock_type= TL_WRITE_ALLOW_WRITE;
@@ -2858,18 +2840,26 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
}
m_table= NULL;
m_table_info= NULL;
- if (m_active_trans)
- DBUG_PRINT("warning", ("m_active_trans != NULL"));
+ /*
+ This is the place to make sure this handler instance
+ no longer are connected to the active transaction.
+
+ And since the handler is no longer part of the transaction
+ it can't have open cursors, ops or blobs pending.
+ */
+ m_active_trans= NULL;
+
if (m_active_cursor)
DBUG_PRINT("warning", ("m_active_cursor != NULL"));
+ m_active_cursor= NULL;
+
if (blobs_pending)
DBUG_PRINT("warning", ("blobs_pending != 0"));
+ blobs_pending= 0;
+
if (ops_pending)
DBUG_PRINT("warning", ("ops_pending != 0L"));
- m_active_trans= NULL;
- m_active_cursor= NULL;
ops_pending= 0;
- blobs_pending= 0;
}
DBUG_RETURN(error);
}
@@ -2929,7 +2919,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
"stmt" : "all"));
DBUG_ASSERT(ndb && trans);
- if (trans->execute(Commit) != 0)
+ if (execute_commit(0,trans) != 0)
{
const NdbError err= trans->getNdbError();
const NdbOperation *error_op= trans->getNdbErrorOperation();
@@ -3180,12 +3170,24 @@ int ha_ndbcluster::create(const char *name,
const void *data, *pack_data;
const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN];
+ bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
DBUG_ENTER("create");
DBUG_PRINT("enter", ("name: %s", name));
fn_format(name2, name, "", "",2); // Remove the .frm extension
set_dbname(name2);
- set_tabname(name2);
+ set_tabname(name2);
+
+ if (create_from_engine)
+ {
+ /*
+ Table alreay exists in NDB and frm file has been created by
+ caller.
+ Do Ndb specific stuff, such as create a .ndb file
+ */
+ my_errno= write_ndb_file();
+ DBUG_RETURN(my_errno);
+ }
DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname);
@@ -3226,16 +3228,12 @@ int ha_ndbcluster::create(const char *name,
tab.addColumn(col);
}
- my_errno= 0;
- if (check_ndb_connection())
- {
- my_errno= HA_ERR_NO_CONNECTION;
+ if ((my_errno= check_ndb_connection()))
DBUG_RETURN(my_errno);
- }
// Create the table in NDB
NDBDICT *dict= m_ndb->getDictionary();
- if (dict->createTable(tab))
+ if (dict->createTable(tab) != 0)
{
const NdbError err= dict->getNdbError();
ERR_PRINT(err);
@@ -3248,6 +3246,9 @@ int ha_ndbcluster::create(const char *name,
// Create secondary indexes
my_errno= build_index_list(form, ILBP_CREATE);
+ if (!my_errno)
+ my_errno= write_ndb_file();
+
DBUG_RETURN(my_errno);
}
@@ -3283,7 +3284,6 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name));
- // NdbDictionary::Index ndb_index(name);
NdbDictionary::Index ndb_index(name);
if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
@@ -3324,14 +3324,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
set_tabname(from);
set_tabname(to, new_tabname);
- if (check_ndb_connection()) {
- my_errno= HA_ERR_NO_CONNECTION;
- DBUG_RETURN(my_errno);
- }
+ if (check_ndb_connection())
+ DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
+
int result= alter_table_name(m_tabname, new_tabname);
if (result == 0)
+ {
set_tabname(to);
+ handler::rename_table(from, to);
+ }
DBUG_RETURN(result);
}
@@ -3376,6 +3378,8 @@ int ha_ndbcluster::delete_table(const char *name)
if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ handler::delete_table(name);
DBUG_RETURN(drop_table());
}
@@ -3477,7 +3481,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
for (i= 0; i < MAX_KEY; i++)
{
m_index[i].type= UNDEFINED_INDEX;
- m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
}
@@ -3520,7 +3523,6 @@ ha_ndbcluster::~ha_ndbcluster()
int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
{
- int res;
KEY *key;
DBUG_ENTER("open");
DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
@@ -3547,11 +3549,8 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
free_share(m_share); m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
- res= get_metadata(name);
- if (!res)
- info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
-
- DBUG_RETURN(res);
+
+ DBUG_RETURN(get_metadata(name));
}
@@ -3575,13 +3574,7 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb()
Thd_ndb *thd_ndb;
DBUG_ENTER("seize_thd_ndb");
-#ifdef USE_NDB_POOL
- // Seize from pool
- ndb= Ndb::seize();
- xxxxxxxxxxxxxx error
-#else
thd_ndb= new Thd_ndb();
-#endif
thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
if (thd_ndb->ndb->init(max_transactions) != 0)
{
@@ -3602,46 +3595,45 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb()
void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
{
DBUG_ENTER("release_thd_ndb");
-#ifdef USE_NDB_POOL
- // Release to pool
- Ndb::release(ndb);
- xxxxxxxxxxxx error
-#else
delete thd_ndb;
-#endif
DBUG_VOID_RETURN;
}
/*
- If this thread already has a Ndb object allocated
+ If this thread already has a Thd_ndb object allocated
in current THD, reuse it. Otherwise
- seize a Ndb object, assign it to current THD and use it.
-
- Having a Ndb object also means that a connection to
- NDB cluster has been opened. The connection is
- checked.
+ seize a Thd_ndb object, assign it to current THD and use it.
*/
-int ha_ndbcluster::check_ndb_connection()
+Ndb* check_ndb_in_thd(THD* thd)
{
- THD *thd= current_thd;
+ DBUG_ENTER("check_ndb_in_thd");
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
- DBUG_ENTER("check_ndb_connection");
if (!thd_ndb)
{
- thd_ndb= seize_thd_ndb();
- if (!thd_ndb)
- DBUG_RETURN(2);
+ if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+ DBUG_RETURN(NULL);
thd->transaction.thd_ndb= thd_ndb;
}
- m_ndb= thd_ndb->ndb;
+ DBUG_RETURN(thd_ndb->ndb);
+}
+
+
+int ha_ndbcluster::check_ndb_connection()
+{
+ THD* thd= current_thd;
+ DBUG_ENTER("check_ndb_connection");
+
+ if (!(m_ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
m_ndb->setDatabaseName(m_dbname);
DBUG_RETURN(0);
}
+
void ndbcluster_close_connection(THD *thd)
{
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
@@ -3659,28 +3651,29 @@ void ndbcluster_close_connection(THD *thd)
Try to discover one table from NDB
*/
-int ndbcluster_discover(const char *dbname, const char *name,
+int ndbcluster_discover(THD* thd, const char *db, const char *name,
const void** frmblob, uint* frmlen)
{
uint len;
const void* data;
const NDBTAB* tab;
+ Ndb* ndb;
DBUG_ENTER("ndbcluster_discover");
- DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+ DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
- Ndb ndb(g_ndb_cluster_connection, dbname);
- ndb.getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ ndb->setDatabaseName(db);
- if (ndb.init())
- ERR_RETURN(ndb.getNdbError());
-
- if (ndb.waitUntilReady(0))
- ERR_RETURN(ndb.getNdbError());
-
- if (!(tab= ndb.getDictionary()->getTable(name)))
- {
- DBUG_PRINT("info", ("Table %s not found", name));
- DBUG_RETURN(1);
+ NDBDICT* dict= ndb->getDictionary();
+ dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
+ dict->invalidateTable(name);
+ if (!(tab= dict->getTable(name)))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ DBUG_RETURN(1);
+ ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
@@ -3702,41 +3695,196 @@ int ndbcluster_discover(const char *dbname, const char *name,
DBUG_RETURN(0);
}
-
-#ifdef USE_DISCOVER_ON_STARTUP
/*
- Dicover tables from NDB Cluster
- - fetch a list of tables from NDB
- - store the frm file for each table on disk
- - if the table has an attached frm file
- - if the database of the table exists
-*/
+ Check if a table exists in NDB
+
+ */
-int ndb_discover_tables()
+int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
+{
+ uint len;
+ const void* data;
+ const NDBTAB* tab;
+ Ndb* ndb;
+ DBUG_ENTER("ndbcluster_table_exists");
+ DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ ndb->setDatabaseName(db);
+
+ NDBDICT* dict= ndb->getDictionary();
+ dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
+ dict->invalidateTable(name);
+ if (!(tab= dict->getTable(name)))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ DBUG_RETURN(0);
+ ERR_RETURN(err);
+ }
+
+ DBUG_PRINT("info", ("Found table %s", tab->getName()));
+ DBUG_RETURN(1);
+}
+
+
+
+extern "C" byte* tables_get_key(const char *entry, uint *length,
+ my_bool not_used __attribute__((unused)))
+{
+ *length= strlen(entry);
+ return (byte*) entry;
+}
+
+
+int ndbcluster_find_files(THD *thd,const char *db,const char *path,
+ const char *wild, bool dir, List<char> *files)
{
uint i;
+ Ndb* ndb;
+ char name[FN_REFLEN];
+ HASH ndb_tables, ok_tables;
NdbDictionary::Dictionary::List list;
- NdbDictionary::Dictionary* dict;
- char path[FN_REFLEN];
- DBUG_ENTER("ndb_discover_tables");
-
- /* List tables in NDB Cluster kernel */
- dict= g_ndb->getDictionary();
+ DBUG_ENTER("ndbcluster_find_files");
+ DBUG_PRINT("enter", ("db: %s", db));
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ if (dir)
+ DBUG_RETURN(0); // Discover of databases not yet supported
+
+ // List tables in NDB
+ NDBDICT *dict= ndb->getDictionary();
if (dict->listObjects(list,
NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError());
-
+
+ if (hash_init(&ndb_tables, system_charset_info,list.count,0,0,
+ (hash_get_key)tables_get_key,0,0))
+ {
+ DBUG_PRINT("error", ("Failed to init HASH ndb_tables"));
+ DBUG_RETURN(-1);
+ }
+
+ if (hash_init(&ok_tables, system_charset_info,32,0,0,
+ (hash_get_key)tables_get_key,0,0))
+ {
+ DBUG_PRINT("error", ("Failed to init HASH ok_tables"));
+ hash_free(&ndb_tables);
+ DBUG_RETURN(-1);
+ }
+
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
- DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
- if (create_table_from_handler(t.database, t.name, true))
- DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
+ // Apply wildcard to list of tables in NDB
+ if (wild)
+ {
+ if (lower_case_table_names)
+ {
+ if (wild_case_compare(files_charset_info, t.name, wild))
+ continue;
+ }
+ else if (wild_compare(t.name,wild,0))
+ continue;
+ }
+ DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));
+ my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
}
- DBUG_RETURN(0);
+
+ char *file_name;
+ List_iterator<char> it(*files);
+ List<char> delete_list;
+ while ((file_name=it++))
+ {
+ DBUG_PRINT("info", ("%s", file_name));
+ if (hash_search(&ndb_tables, file_name, strlen(file_name)))
+ {
+ DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name));
+ // File existed in NDB and as frm file, put in ok_tables list
+ my_hash_insert(&ok_tables, (byte*)file_name);
+ continue;
+ }
+
+ // File is not in NDB, check for .ndb file with this name
+ (void)strxnmov(name, FN_REFLEN,
+ mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
+ DBUG_PRINT("info", ("Check access for %s", name));
+ if (access(name, F_OK))
+ {
+ DBUG_PRINT("info", ("%s did not exist on disk", name));
+ // .ndb file did not exist on disk, another table type
+ continue;
+ }
+
+ DBUG_PRINT("info", ("%s existed on disk", name));
+ // The .ndb file exists on disk, but it's not in list of tables in ndb
+ // Verify that handler agrees table is gone.
+ if (ndbcluster_table_exists(thd, db, file_name) == 0)
+ {
+ DBUG_PRINT("info", ("NDB says %s does not exists", file_name));
+ it.remove();
+ // Put in list of tables to remove from disk
+ delete_list.push_back(thd->strdup(file_name));
+ }
+ }
+
+ // Check for new files to discover
+ DBUG_PRINT("info", ("Checking for new files to discover"));
+ List<char> create_list;
+ for (i= 0 ; i < ndb_tables.records ; i++)
+ {
+ file_name= hash_element(&ndb_tables, i);
+ if (!hash_search(&ok_tables, file_name, strlen(file_name)))
+ {
+ DBUG_PRINT("info", ("%s must be discovered", file_name));
+ // File is in list of ndb tables and not in ok_tables
+ // This table need to be created
+ create_list.push_back(thd->strdup(file_name));
+ }
+ }
+
+ // Lock mutex before deleting and creating frm files
+ pthread_mutex_lock(&LOCK_open);
+
+ if (!global_read_lock)
+ {
+ // Delete old files
+ List_iterator_fast<char> it3(delete_list);
+ while ((file_name=it3++))
+ {
+ DBUG_PRINT("info", ("Remove table %s/%s",db, file_name ));
+ // Delete the table and all related files
+ TABLE_LIST table_list;
+ bzero((char*) &table_list,sizeof(table_list));
+ table_list.db= (char*) db;
+ table_list.real_name=(char*)file_name;
+ (void)mysql_rm_table_part2(thd, &table_list,
+ /* if_exists */ true,
+ /* drop_temporary */ false,
+ /* dont_log_query*/ true);
+ }
+ }
+
+ // Create new files
+ List_iterator_fast<char> it2(create_list);
+ while ((file_name=it2++))
+ {
+ DBUG_PRINT("info", ("Table %s need discovery", name));
+ ha_create_table_from_engine(thd, db, file_name, true);
+ files->push_back(thd->strdup(file_name));
+ }
+
+ pthread_mutex_unlock(&LOCK_open);
+
+ hash_free(&ok_tables);
+ hash_free(&ndb_tables);
+ DBUG_RETURN(0);
}
-#endif
/*
@@ -3791,7 +3939,7 @@ bool ndbcluster_init()
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
- if (res == 0 && ndb_discover_tables() != 0)
+ if (ndb_discover_tables() != 0)
DBUG_RETURN(TRUE);
#endif
DBUG_RETURN(false);
@@ -3807,7 +3955,6 @@ bool ndbcluster_init()
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
-
if(g_ndb)
delete g_ndb;
g_ndb= NULL;
@@ -3817,9 +3964,6 @@ bool ndbcluster_end()
if (!ndbcluster_inited)
DBUG_RETURN(0);
hash_free(&ndbcluster_open_tables);
-#ifdef USE_NDB_POOL
- ndb_pool_release();
-#endif
pthread_mutex_destroy(&ndbcluster_mutex);
ndbcluster_inited= 0;
DBUG_RETURN(0);
@@ -4138,7 +4282,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (pOp == NULL)
break;
- NdbResultSet* rs= pOp->readTuples(NdbScanOperation::LM_Dirty);
+ NdbResultSet* rs= pOp->readTuples(NdbOperation::LM_CommittedRead);
if (rs == 0)
break;
@@ -4178,4 +4322,30 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN(-1);
}
+/*
+ Create a .ndb file to serve as a placeholder indicating
+ that the table with this name is a ndb table
+*/
+
+int ha_ndbcluster::write_ndb_file()
+{
+ File file;
+ bool error=1;
+ char path[FN_REFLEN];
+
+ DBUG_ENTER("write_ndb_file");
+ DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
+
+ (void)strxnmov(path, FN_REFLEN,
+ mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
+
+ if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
+ {
+ // It's an empty file
+ error=0;
+ my_close(file,MYF(0));
+ }
+ DBUG_RETURN(error);
+}
+
#endif /* HAVE_NDBCLUSTER_DB */