diff options
-rw-r--r-- | include/my_base.h | 3 | ||||
-rw-r--r-- | sql/Makefile.am | 10 | ||||
-rw-r--r-- | sql/discover.cc | 172 | ||||
-rwxr-xr-x | sql/ha_ndbcluster.cc | 2943 | ||||
-rwxr-xr-x | sql/ha_ndbcluster.h | 218 | ||||
-rw-r--r-- | sql/handler.cc | 92 | ||||
-rw-r--r-- | sql/handler.h | 24 | ||||
-rw-r--r-- | sql/lex.h | 8 | ||||
-rw-r--r-- | sql/mysql_priv.h | 7 | ||||
-rw-r--r-- | sql/mysqld.cc | 30 | ||||
-rw-r--r-- | sql/set_var.cc | 4 | ||||
-rw-r--r-- | sql/sql_base.cc | 21 | ||||
-rw-r--r-- | sql/sql_class.h | 8 | ||||
-rw-r--r-- | sql/sql_table.cc | 29 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 40 | ||||
-rw-r--r-- | sql/table.cc | 3 |
16 files changed, 3564 insertions, 48 deletions
diff --git a/include/my_base.h b/include/my_base.h index b38e177fd89..28d78d08401 100644 --- a/include/my_base.h +++ b/include/my_base.h @@ -287,6 +287,9 @@ enum ha_base_keytype { #define HA_ERR_ROW_IS_REFERENCED 152 /* Cannot delete a parent row */ #define HA_ERR_NO_SAVEPOINT 153 /* No savepoint with that name */ #define HA_ERR_NON_UNIQUE_BLOCK_SIZE 154 /* Non unique key block size */ +#define HA_ERR_OLD_METADATA 155 /* The frm file on disk is old */ +#define HA_ERR_TABLE_EXIST 156 /* The table existed in storage engine */ +#define HA_ERR_NO_CONNECTION 157 /* Could not connect to storage engine */ /* Other constants */ diff --git a/sql/Makefile.am b/sql/Makefile.am index 4aa6aaaa1ee..6845e0b5de8 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -16,12 +16,11 @@ #called from the top level Makefile - MYSQLDATAdir = $(localstatedir) MYSQLSHAREdir = $(pkgdatadir) MYSQLBASEdir= $(prefix) INCLUDES = @MT_INCLUDES@ \ - @bdb_includes@ @innodb_includes@ \ + @bdb_includes@ @innodb_includes@ @ndbcluster_includes@ \ -I$(top_srcdir)/include -I$(top_srcdir)/regex \ -I$(srcdir) $(openssl_includes) WRAPLIBS= @WRAPLIBS@ @@ -42,6 +41,7 @@ LDADD = @isam_libs@ \ mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \ @bdb_libs@ @innodb_libs@ @pstack_libs@ \ @innodb_system_libs@ \ + @ndbcluster_libs@ @ndbcluster_system_libs@ \ $(LDADD) $(CXXLDFLAGS) $(WRAPLIBS) @LIBDL@ @openssl_libs@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ item_strfunc.h item_timefunc.h item_uniq.h \ @@ -52,7 +52,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ field.h handler.h \ ha_isammrg.h ha_isam.h ha_myisammrg.h\ ha_heap.h ha_myisam.h ha_berkeley.h ha_innodb.h \ - opt_range.h protocol.h \ + ha_ndbcluster.h opt_range.h protocol.h \ sql_select.h structs.h table.h sql_udf.h hash_filo.h\ lex.h lex_symbol.h sql_acl.h sql_crypt.h \ log_event.h sql_repl.h slave.h \ @@ -75,11 +75,11 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc \ procedure.cc item_uniq.cc sql_test.cc \ log.cc log_event.cc init.cc derror.cc sql_acl.cc \ unireg.cc des_key_file.cc \ - time.cc opt_range.cc opt_sum.cc \ + discover.cc time.cc opt_range.cc opt_sum.cc \ records.cc filesort.cc handler.cc \ ha_heap.cc ha_myisam.cc ha_myisammrg.cc \ ha_berkeley.cc ha_innodb.cc \ - ha_isam.cc ha_isammrg.cc \ + ha_isam.cc ha_isammrg.cc ha_ndbcluster.cc \ sql_db.cc sql_table.cc sql_rename.cc sql_crypt.cc \ sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \ sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \ diff --git a/sql/discover.cc b/sql/discover.cc new file mode 100644 index 00000000000..e260f44a8db --- /dev/null +++ b/sql/discover.cc @@ -0,0 +1,172 @@ +/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +/* Functions for discover of frm file from handler */ + +#include "mysql_priv.h" +#include <my_dir.h> + +/* + Read the contents of a .frm file + + SYNOPSIS + readfrm() + + name path to table-file "db/name" + frmdata frm data + len length of the read frmdata + + RETURN VALUES + 0 ok + 1 Could not open file + 2 Could not stat file + 3 Could not allocate data for read + Could not read file + + frmdata and len are set to 0 on error +*/ + +int readfrm(const char *name, + const void **frmdata, uint *len) +{ + int error; + char index_file[FN_REFLEN]; + File file; + ulong read_len; + char *read_data; + MY_STAT state; + DBUG_ENTER("readfrm"); + DBUG_PRINT("enter",("name: '%s'",name)); + + *frmdata= NULL; // In case of errors + *len= 0; + error= 1; + if ((file=my_open(fn_format(index_file,name,"",reg_ext,4), + O_RDONLY | O_SHARE, + MYF(0))) < 0) + goto err_end; + + // Get length of file + error= 2; + if (my_fstat(file, &state, MYF(0))) + goto err; + read_len= state.st_size; + + // Read whole frm file + error= 3; + read_data= 0; + if (read_string(file, &read_data, read_len)) + goto err; + + // Setup return data + *frmdata= (void*) read_data; + *len= read_len; + error= 0; + + err: + if (file > 0) + VOID(my_close(file,MYF(MY_WME))); + + err_end: /* Here when no file */ + DBUG_RETURN (error); +} /* readfrm */ + + +/* + Write the content of a frm data pointer + to a frm file + + SYNOPSIS + writefrm() + + name path to table-file "db/name" + frmdata frm data + len length of the frmdata + + RETURN VALUES + 0 ok + 2 Could not write file +*/ + +int writefrm(const char *name, const void *frmdata, uint len) +{ + File file; + char index_file[FN_REFLEN]; + int error; + DBUG_ENTER("writefrm"); + DBUG_PRINT("enter",("name: '%s' len: %d ",name,len)); + //DBUG_DUMP("frmdata", (char*)frmdata, len); + + error= 0; + if ((file=my_create(fn_format(index_file,name,"",reg_ext,4), + CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0) + { + if (my_write(file,(byte*)frmdata,len,MYF(MY_WME | MY_NABP))) + error= 2; + } + VOID(my_close(file,MYF(0))); + DBUG_RETURN(error); +} /* writefrm */ + + + + +/* + Try to discover table from handler and + if found, write the frm file to disk. + + RETURN VALUES: + 0 : Table existed in handler and created + on disk if so requested + 1 : Table does not exist + >1 : error + +*/ + +int create_table_from_handler(const char *db, + const char *name, + bool create_if_found) +{ + int error= 0; + const void* frmblob = NULL; + char path[FN_REFLEN]; + uint frmlen = 0; + DBUG_ENTER("create_table_from_handler"); + DBUG_PRINT("enter", ("create_if_found: %d", create_if_found)); + + if (ha_discover(db, name, &frmblob, &frmlen)) + DBUG_RETURN(1); // Table does not exist + + // Table exists in handler + if (create_if_found) + { + (void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS); + // Save the frm file + error = writefrm(path, frmblob, frmlen); + } + + err: + if (frmblob) + my_free((char*) frmblob,MYF(0)); + DBUG_RETURN(error); +} + +int table_exists_in_handler(const char *db, + const char *name) +{ + return (create_table_from_handler(db, name, false) == 0); +} diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc new file mode 100755 index 00000000000..3bc322878d1 --- /dev/null +++ b/sql/ha_ndbcluster.cc @@ -0,0 +1,2943 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +/* + This file defines the NDB Cluster handler: the interface between MySQL and + NDB Cluster +*/ + +/* + TODO + After CREATE DATABASE gör discover på alla tabeller i den databasen + +*/ + + +#ifdef __GNUC__ +#pragma implementation // gcc: Class implementation +#endif + +#include "mysql_priv.h" + +#ifdef HAVE_NDBCLUSTER_DB +#include <my_dir.h> +#include "ha_ndbcluster.h" +#include <ndbapi/NdbApi.hpp> +#include <ndbapi/NdbScanFilter.hpp> + +#define USE_DISCOVER_ON_STARTUP +//#define USE_NDB_POOL + +// Default value for parallelism +static const int parallelism= 240; + +#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 + +#define ERR_PRINT(err) \ + DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message)) + +#define ERR_RETURN(err) \ +{ \ + ERR_PRINT(err); \ + DBUG_RETURN(ndb_to_mysql_error(&err)); \ +} + +// Typedefs for long names +typedef NdbDictionary::Column NDBCOL; +typedef NdbDictionary::Table NDBTAB; +typedef NdbDictionary::Index NDBINDEX; +typedef NdbDictionary::Dictionary NDBDICT; + +bool ndbcluster_inited= false; + +// Handler synchronization +pthread_mutex_t ndbcluster_mutex; + +// Table lock handling +static HASH ndbcluster_open_tables; + +static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length, + my_bool not_used __attribute__((unused))); +static NDB_SHARE *get_share(const char *table_name); +static void free_share(NDB_SHARE *share); + +static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len); +static int unpackfrm(const void **data, uint *len, + const void* pack_data); + +/* + Error handling functions +*/ + +struct err_code_mapping +{ + int ndb_err; + int my_err; +}; + +static const err_code_mapping err_map[]= +{ + { 626, HA_ERR_KEY_NOT_FOUND }, + { 630, HA_ERR_FOUND_DUPP_KEY }, + { 893, HA_ERR_FOUND_DUPP_UNIQUE }, + { 721, HA_ERR_TABLE_EXIST }, + { 241, HA_ERR_OLD_METADATA }, + { -1, -1 } +}; + + +static int ndb_to_mysql_error(const NdbError *err) +{ + uint i; + for (i=0 ; err_map[i].ndb_err != err->code ; i++) + { + if (err_map[i].my_err == -1) + return err->code; + } + return err_map[i].my_err; +} + + +/* + Take care of the error that occured in NDB + + RETURN + 0 No error + # The mapped error code +*/ + +int ha_ndbcluster::ndb_err(NdbConnection *trans) +{ + const NdbError err= trans->getNdbError(); + if (!err.code) + return 0; // Don't log things to DBUG log if no error + DBUG_ENTER("ndb_err"); + + ERR_PRINT(err); + switch (err.classification) { + case NdbError::SchemaError: + { + NDBDICT *dict= m_ndb->getDictionary(); + DBUG_PRINT("info", ("invalidateTable %s", m_tabname)); + dict->invalidateTable(m_tabname); + break; + } + default: + break; + } + DBUG_RETURN(ndb_to_mysql_error(&err)); +} + + +/* + Instruct NDB to set the value of the hidden primary key +*/ + +bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op, + uint fieldnr, const byte *field_ptr) +{ + DBUG_ENTER("set_hidden_key"); + DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr, + NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0); +} + + +/* + Instruct NDB to set the value of one primary key attribute +*/ + +int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, + uint fieldnr, const byte *field_ptr) +{ + uint32 pack_len= field->pack_length(); + DBUG_ENTER("set_ndb_key"); + DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d", + fieldnr, field->field_name, field->type(), + pack_len)); + DBUG_DUMP("key", (char*)field_ptr, pack_len); + + switch (field->type()) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + // Common implementation for most field types + DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + default: + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + DBUG_RETURN(3); +} + + +/* + Instruct NDB to set the value of one attribute +*/ + +int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, + uint fieldnr) +{ + const byte* field_ptr= field->ptr; + uint32 pack_len= field->pack_length(); + DBUG_ENTER("set_ndb_value"); + DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s", + fieldnr, field->field_name, field->type(), + pack_len, field->is_null()?"Y":"N")); + DBUG_DUMP("value", (char*) field_ptr, pack_len); + + if (field->is_null()) + { + // Set value to NULL + DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); + } + + switch (field->type()) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + // Common implementation for most field types + DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + default: + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + DBUG_RETURN(3); +} + + +/* + Instruct NDB to fetch one field + - data is read directly into buffer provided by field_ptr + if it's NULL, data is read into memory provided by NDBAPI +*/ + +int ha_ndbcluster::get_ndb_value(NdbOperation *op, + uint field_no, byte *field_ptr) +{ + DBUG_ENTER("get_ndb_value"); + DBUG_PRINT("enter", ("field_no: %d", field_no)); + m_value[field_no]= op->getValue(field_no, field_ptr); + DBUG_RETURN(m_value == NULL); +} + + +/* + Get metadata for this table from NDB + + IMPLEMENTATION + - save the NdbDictionary::Table for easy access + - check that frm-file on disk is equal to frm-file + of table accessed in NDB + - build a list of the indexes for the table +*/ + +int ha_ndbcluster::get_metadata(const char *path) +{ + NDBDICT *dict= m_ndb->getDictionary(); + const NDBTAB *tab; + const void *data, *pack_data; + const char **key_name; + uint ndb_columns, mysql_columns, length, pack_length, i; + int error; + DBUG_ENTER("get_metadata"); + DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path)); + + if (!(tab= dict->getTable(m_tabname))) + ERR_RETURN(dict->getNdbError()); + DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); + + /* + This is the place to check that the table we got from NDB + is equal to the one on local disk + */ + ndb_columns= (uint) tab->getNoOfColumns(); + mysql_columns= table->fields; + if (table->primary_key == MAX_KEY) + ndb_columns--; + if (ndb_columns != mysql_columns) + { + DBUG_PRINT("error", + ("Wrong number of columns, ndb: %d mysql: %d", + ndb_columns, mysql_columns)); + DBUG_RETURN(HA_ERR_OLD_METADATA); + } + + /* + Compare FrmData in NDB with frm file from disk. + */ + error= 0; + if (readfrm(path, &data, &length) || + packfrm(data, length, &pack_data, &pack_length)) + { + my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_RETURN(1); + } + + if ((pack_length != tab->getFrmLength()) || + (memcmp(pack_data, tab->getFrmData(), pack_length))) + { + DBUG_PRINT("error", + ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", + pack_length, tab->getFrmLength(), + memcmp(pack_data, tab->getFrmData(), pack_length))); + DBUG_DUMP("pack_data", (char*)pack_data, pack_length); + DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength()); + error= HA_ERR_OLD_METADATA; + } + my_free((char*)data, MYF(0)); + my_free((char*)pack_data, MYF(0)); + if (error) + DBUG_RETURN(error); + + // All checks OK, lets use the table + m_table= (void*)tab; + + for (i= 0; i < MAX_KEY; i++) + m_indextype[i]= UNDEFINED_INDEX; + + // Save information about all known indexes + for (i= 0; i < table->keys; i++) + m_indextype[i] = get_index_type_from_table(i); + + DBUG_RETURN(0); +} + +/* + Decode the type of an index from information + provided in table object +*/ +NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const +{ + if (index_no == table->primary_key) + return PRIMARY_KEY_INDEX; + else + return ((table->key_info[index_no].flags & HA_NOSAME) ? + UNIQUE_INDEX : + ORDERED_INDEX); +} + + +void ha_ndbcluster::release_metadata() +{ + DBUG_ENTER("release_metadata"); + DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); + + m_table= NULL; + + DBUG_VOID_RETURN; +} + +static const ulong index_type_flags[]= +{ + /* UNDEFINED_INDEX */ + 0, + + /* PRIMARY_KEY_INDEX */ + HA_ONLY_WHOLE_INDEX | + HA_WRONG_ASCII_ORDER | + HA_NOT_READ_PREFIX_LAST, + + /* UNIQUE_INDEX */ + HA_ONLY_WHOLE_INDEX | + HA_WRONG_ASCII_ORDER | + HA_NOT_READ_PREFIX_LAST, + + /* ORDERED_INDEX */ + HA_READ_NEXT | + HA_READ_PREV | + HA_NOT_READ_AFTER_KEY +}; + +static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong); + +inline const char* ha_ndbcluster::get_index_name(uint idx_no) const +{ + return table->keynames.type_names[idx_no]; +} + +inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const +{ + DBUG_ASSERT(idx_no < MAX_KEY); + return m_indextype[idx_no]; +} + + +/* + Get the flags for an index + + RETURN + flags depending on the type of the index. +*/ + +inline ulong ha_ndbcluster::index_flags(uint idx_no) const +{ + DBUG_ENTER("index_flags"); + DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size); + DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]); +} + + +int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key) +{ + KEY* key_info= table->key_info + table->primary_key; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + DBUG_ENTER("set_primary_key"); + + for (; key_part != end; key_part++) + { + Field* field= key_part->field; + if (set_ndb_key(op, field, + key_part->fieldnr-1, key)) + ERR_RETURN(op->getNdbError()); + key += key_part->length; + } + DBUG_RETURN(0); +} + + +int ha_ndbcluster::set_primary_key(NdbOperation *op) +{ + DBUG_ENTER("set_primary_key"); + KEY* key_info= table->key_info + table->primary_key; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + + for (; key_part != end; key_part++) + { + Field* field= key_part->field; + if (set_ndb_key(op, field, + key_part->fieldnr-1, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + DBUG_RETURN(0); +} + + +/* + Read one record from NDB using primary key +*/ + +int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) +{ + uint no_fields= table->fields, i; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + THD *thd= current_thd; + DBUG_ENTER("pk_read"); + DBUG_PRINT("enter", ("key_len: %u", key_len)); + DBUG_DUMP("key", (char*)key, key_len); + + if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0) + goto err; + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + DBUG_DUMP("key", (char*)key, 8); + if (set_hidden_key(op, no_fields, key)) + goto err; + // Read key at the same time, for future reference + if (get_ndb_value(op, no_fields, NULL)) + goto err; + } + else + { + int res; + if ((res= set_primary_key(op, key))) + return res; + } + + // Read non-key field(s) + for (i= 0; i < no_fields; i++) + { + Field *field= table->field[i]; + if (thd->query_id == field->query_id) + { + if (get_ndb_value(op, i, field->ptr)) + goto err; + } + else + { + // Attribute was not to be read + m_value[i]= NULL; + } + } + + if (trans->execute(NoCommit, IgnoreError) != 0) + { + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(ndb_err(trans)); + } + + // The value have now been fetched from NDB + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + + err: + ERR_RETURN(trans->getNdbError()); +} + + +/* + Read one record from NDB using unique secondary index +*/ + +int ha_ndbcluster::unique_index_read(const byte *key, + uint key_len, byte *buf) +{ + NdbConnection *trans= m_active_trans; + const char *index_name; + NdbIndexOperation *op; + THD *thd= current_thd; + byte *key_ptr; + KEY* key_info; + KEY_PART_INFO *key_part, *end; + uint i; + DBUG_ENTER("unique_index_read"); + DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); + DBUG_DUMP("key", (char*)key, key_len); + + index_name= get_index_name(active_index); + if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) || + op->readTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + // Set secondary index key(s) + key_ptr= (byte *) key; + key_info= table->key_info + active_index; + DBUG_ASSERT(key_info->key_length == key_len); + end= (key_part= key_info->key_part) + key_info->key_parts; + + for (i= 0; key_part != end; key_part++, i++) + { + if (set_ndb_key(op, key_part->field, i, key_ptr)) + ERR_RETURN(trans->getNdbError()); + key_ptr+= key_part->length; + } + + // Get non-index attribute(s) + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG)) + { + if (get_ndb_value(op, i, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + else + { + // Attribute was not to be read + m_value[i]= NULL; + } + } + + if (trans->execute(NoCommit, IgnoreError) != 0) + { + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(ndb_err(trans)); + } + // The value have now been fetched from NDB + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); +} + +/* + Get the next record of a started scan +*/ + +inline int ha_ndbcluster::next_result(byte *buf) +{ + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("next_result"); + + if (cursor->nextResult() == 0) + { + // One more record found + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + } + table->status= STATUS_NOT_FOUND; + if (ndb_err(trans)) + ERR_RETURN(trans->getNdbError()); + + // No more records + DBUG_PRINT("info", ("No more records")); + DBUG_RETURN(HA_ERR_END_OF_FILE); +} + + +/* + Read record(s) from NDB using ordered index scan +*/ + +int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag) +{ + uint no_fields= table->fields; + uint tot_len, i; + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor= m_active_cursor; + NdbScanOperation *op; + const char *bound_str= NULL; + const char *index_name; + NdbOperation::BoundType bound_type = NdbOperation::BoundEQ; + bool can_be_handled_by_ndb= FALSE; + byte *key_ptr; + KEY *key_info; + THD* thd = current_thd; + DBUG_ENTER("ordered_index_scan"); + DBUG_PRINT("enter", ("index: %u", active_index)); + DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); + + index_name= get_index_name(active_index); + if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) + ERR_RETURN(trans->getNdbError()); + if (!(cursor= op->readTuples(parallelism))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + switch (find_flag) { + case HA_READ_KEY_EXACT: /* Find first record else error */ + bound_str= "HA_READ_KEY_EXACT"; + bound_type= NdbOperation::BoundEQ; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_KEY_OR_NEXT: /* Record or next record */ + bound_str= "HA_READ_KEY_OR_NEXT"; + bound_type= NdbOperation::BoundLE; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_KEY_OR_PREV: /* Record or previous */ + bound_str= "HA_READ_KEY_OR_PREV"; + bound_type= NdbOperation::BoundGE; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_AFTER_KEY: /* Find next rec. after key-record */ + bound_str= "HA_READ_AFTER_KEY"; + bound_type= NdbOperation::BoundLT; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */ + bound_str= "HA_READ_BEFORE_KEY"; + bound_type= NdbOperation::BoundGT; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_PREFIX: /* Key which as same prefix */ + bound_str= "HA_READ_PREFIX"; + break; + case HA_READ_PREFIX_LAST: /* Last key with the same prefix */ + bound_str= "HA_READ_PREFIX_LAST"; + break; + case HA_READ_PREFIX_LAST_OR_PREV: + /* Last or prev key with the same prefix */ + bound_str= "HA_READ_PREFIX_LAST_OR_PREV"; + break; + default: + bound_str= "UNKNOWN"; + break; + } + DBUG_PRINT("info", ("find_flag: %s, bound_type: %d," + "can_be_handled_by_ndb: %d", + bound_str, bound_type, can_be_handled_by_ndb)); + if (!can_be_handled_by_ndb) + DBUG_RETURN(1); + + // Set bounds using key data + tot_len= 0; + key_ptr= (byte *) key; + key_info= table->key_info + active_index; + for (i= 0; i < key_info->key_parts; i++) + { + Field* field= key_info->key_part[i].field; + uint32 field_len= field->pack_length(); + DBUG_PRINT("info", ("Set index bound on %s", + field->field_name)); + DBUG_DUMP("key", (char*)key_ptr, field_len); + + if (op->setBound(field->field_name, + bound_type, + key_ptr, + field_len) != 0) + ERR_RETURN(op->getNdbError()); + + key_ptr+= field_len; + tot_len+= field_len; + if (tot_len >= key_len) + break; + } + + // Define attributes to read + for (i= 0; i < no_fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG)) + { + if (get_ndb_value(op, i, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + else + { + m_value[i]= NULL; + } + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= no_fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} + + +#if 0 +/* + Read record(s) from NDB using full table scan with filter + */ + +int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag) +{ + uint no_fields= table->fields; + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor= m_active_cursor; + + DBUG_ENTER("filtered_scan"); + DBUG_PRINT("enter", ("key_len: %u, index: %u", + key_len, active_index)); + DBUG_DUMP("key", (char*)key, key_len); + DBUG_PRINT("info", ("Starting a new filtered scan on %s", + m_tabname)); + NdbScanOperation *op= trans->getNdbScanOperation(m_tabname); + if (!op) + ERR_RETURN(trans->getNdbError()); + + cursor= op->readTuples(parallelism); + if (!cursor) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + { + // Start scan filter + NdbScanFilter sf(op); + sf.begin(); + + // Set filter using the supplied key data + byte *key_ptr= (byte *) key; + uint tot_len= 0; + KEY* key_info= table->key_info + active_index; + for (uint k= 0; k < key_info->key_parts; k++) + { + KEY_PART_INFO* key_part= key_info->key_part+k; + Field* field= key_part->field; + uint ndb_fieldnr= key_part->fieldnr-1; + DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr)); + // const NDBCOL *col= tab->getColumn(ndb_fieldnr); + uint32 field_len= field->pack_length(); + DBUG_DUMP("key", (char*)key, field_len); + + DBUG_PRINT("info", ("Column %s, type: %d, len: %d", + field->field_name, field->real_type(), field_len)); + + // Define scan filter + if (field->real_type() == MYSQL_TYPE_STRING) + sf.eq(ndb_fieldnr, key_ptr, field_len); + else + { + if (field_len == 8) + sf.eq(ndb_fieldnr, (Uint64)*key_ptr); + else if (field_len <= 4) + sf.eq(ndb_fieldnr, (Uint32)*key_ptr); + else + DBUG_RETURN(1); + } + + key_ptr += field_len; + tot_len += field_len; + + if (tot_len >= key_len) + break; + } + // End scan filter + sf.end(); + } + + // Define attributes to read + for (uint field_no= 0; field_no < no_fields; field_no++) + { + Field *field= table->field[field_no]; + + // Read attribute + DBUG_PRINT("get", ("%d: %s", field_no, field->field_name)); + if (get_ndb_value(op, field_no, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= no_fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} +#endif + + +/* + Read records from NDB using full table scan + */ + +int ha_ndbcluster::full_table_scan(byte *buf) +{ + uint i; + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor; + NdbScanOperation *op; + + DBUG_ENTER("full_table_scan"); + DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); + + if (!(op=trans->getNdbScanOperation(m_tabname))) + ERR_RETURN(trans->getNdbError()); + if (!(cursor= op->readTuples(parallelism))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + // Define attributes to read + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG)) + { + if (get_ndb_value(op, i, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + else + { + m_value[i]= NULL; + } + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= table->fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} + + +/* + Insert one record into NDB +*/ + +int ha_ndbcluster::write_row(byte *record) +{ + uint i; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + int res; + DBUG_ENTER("write_row"); + + statistic_increment(ha_write_count,&LOCK_status); + if (table->timestamp_default_now) + update_timestamp(record+table->timestamp_default_now-1); + if (table->next_number_field && record == table->record[0]) + update_auto_increment(); + + if (!(op= trans->getNdbOperation(m_tabname))) + ERR_RETURN(trans->getNdbError()); + + res= (m_use_write) ? op->writeTuple() :op->insertTuple(); + if (res != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // Table has hidden primary key + Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); + if (set_hidden_key(op, table->fields, (const byte*)&auto_value)) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op))) + return res; + } + + // Set non-key attribute(s) + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if (!(field->flags & PRI_KEY_FLAG) && + set_ndb_value(op, field, i)) + ERR_RETURN(op->getNdbError()); + } + + /* + Execute write operation + NOTE When doing inserts with many values in + each INSERT statement it should not be necessary + to NoCommit the transaction between each row. + Find out how this is detected! + */ + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_RETURN(0); +} + + +/* Compare if a key in a row has changed */ + +int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row, + const byte * new_row) +{ + KEY_PART_INFO *key_part=table->key_info[keynr].key_part; + KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts; + + for (; key_part != end ; key_part++) + { + if (key_part->null_bit) + { + if ((old_row[key_part->null_offset] & key_part->null_bit) != + (new_row[key_part->null_offset] & key_part->null_bit)) + return 1; + } + if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH)) + { + + if (key_part->field->cmp_binary((char*) (old_row + key_part->offset), + (char*) (new_row + key_part->offset), + (ulong) key_part->length)) + return 1; + } + else + { + if (memcmp(old_row+key_part->offset, new_row+key_part->offset, + key_part->length)) + return 1; + } + } + return 0; +} + +/* + Update one record in NDB using primary key +*/ + +int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) +{ + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + uint i; + DBUG_ENTER("update_row"); + + statistic_increment(ha_update_count,&LOCK_status); + if (table->timestamp_on_update_now) + update_timestamp(new_data+table->timestamp_on_update_now-1); + + if (!(op= trans->getNdbOperation(m_tabname)) || + op->updateTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + + // Require that the PK for this record has previously been + // read into m_value + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec); + DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + /* Check for update of primary key and return error */ + if (key_cmp(table->primary_key, old_data, new_data)) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + int res; + if ((res= set_primary_key(op, old_data + table->null_bytes))) + DBUG_RETURN(res); + } + + // Set non-key attribute(s) + for (i= 0; i < table->fields; i++) + { + + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) && + (!(field->flags & PRI_KEY_FLAG)) && + set_ndb_value(op, field, i)) + ERR_RETURN(op->getNdbError()); + } + + // Execute update operation + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + + DBUG_RETURN(0); +} + + +/* + Delete one record from NDB, using primary key +*/ + +int ha_ndbcluster::delete_row(const byte *record) +{ + NdbConnection *trans= m_active_trans; + NdbOperation *op; + DBUG_ENTER("delete_row"); + + statistic_increment(ha_delete_count,&LOCK_status); + + if (!(op=trans->getNdbOperation(m_tabname)) || + op->deleteTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec != NULL); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op))) + return res; + } + + // Execute delete operation + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_RETURN(0); +} + +/* + Unpack a record read from NDB + + SYNOPSIS + unpack_record() + buf Buffer to store read row + + NOTE + The data for each row is read directly into the + destination buffer. This function is primarily + called in order to check if any fields should be + set to null. +*/ + +void ha_ndbcluster::unpack_record(byte* buf) +{ + uint row_offset= (uint) (buf - table->record[0]); + Field **field, **end; + NdbRecAttr **value= m_value; + DBUG_ENTER("unpack_record"); + + // Set null flag(s) + bzero(buf, table->null_bytes); + for (field= table->field, end= field+table->fields; + field < end; + field++, value++) + { + if (*value && (*value)->isNULL()) + (*field)->set_null(row_offset); + } + +#ifndef DBUG_OFF + // Read and print all values that was fetched + if (table->primary_key == MAX_KEY) + { + // Table with hidden primary key + int hidden_no= table->fields; + const NDBTAB *tab= (NDBTAB *) m_table; + const NDBCOL *hidden_col= tab->getColumn(hidden_no); + NdbRecAttr* rec= m_value[hidden_no]; + DBUG_ASSERT(rec); + DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, + hidden_col->getName(), rec->u_64_value())); + } + print_results(); +#endif + DBUG_VOID_RETURN; +} + + +/* + Utility function to print/dump the fetched field + */ + +void ha_ndbcluster::print_results() +{ + const NDBTAB *tab= (NDBTAB*) m_table; + DBUG_ENTER("print_results"); + +#ifndef DBUG_OFF + if (!_db_on_) + DBUG_VOID_RETURN; + + for (uint f=0; f<table->fields;f++) + { + Field *field; + const NDBCOL *col; + NdbRecAttr *value; + + if (!(value= m_value[f])) + { + fprintf(DBUG_FILE, "Field %d was not read\n", f); + continue; + } + field= table->field[f]; + DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); + col= tab->getColumn(f); + fprintf(DBUG_FILE, "%d: %s\t", f, col->getName()); + + if (value->isNULL()) + { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } + + switch (col->getType()) { + case NdbDictionary::Column::Blob: + case NdbDictionary::Column::Undefined: + fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); + break; + case NdbDictionary::Column::Tinyint: { + char value= *field->ptr; + fprintf(DBUG_FILE, "Tinyint\t%d", value); + break; + } + case NdbDictionary::Column::Tinyunsigned: { + unsigned char value= *field->ptr; + fprintf(DBUG_FILE, "Tinyunsigned\t%u", value); + break; + } + case NdbDictionary::Column::Smallint: { + short value= *field->ptr; + fprintf(DBUG_FILE, "Smallint\t%d", value); + break; + } + case NdbDictionary::Column::Smallunsigned: { + unsigned short value= *field->ptr; + fprintf(DBUG_FILE, "Smallunsigned\t%u", value); + break; + } + case NdbDictionary::Column::Mediumint: { + byte value[3]; + memcpy(value, field->ptr, 3); + fprintf(DBUG_FILE, "Mediumint\t%d,%d,%d", value[0], value[1], value[2]); + break; + } + case NdbDictionary::Column::Mediumunsigned: { + byte value[3]; + memcpy(value, field->ptr, 3); + fprintf(DBUG_FILE, "Mediumunsigned\t%u,%u,%u", value[0], value[1], value[2]); + break; + } + case NdbDictionary::Column::Int: { + fprintf(DBUG_FILE, "Int\t%lld", field->val_int()); + break; + } + case NdbDictionary::Column::Unsigned: { + Uint32 value= (Uint32) *field->ptr; + fprintf(DBUG_FILE, "Unsigned\t%u", value); + break; + } + case NdbDictionary::Column::Bigint: { + Int64 value= (Int64) *field->ptr; + fprintf(DBUG_FILE, "Bigint\t%lld", value); + break; + } + case NdbDictionary::Column::Bigunsigned: { + Uint64 value= (Uint64) *field->ptr; + fprintf(DBUG_FILE, "Bigunsigned\t%llu", value); + break; + } + case NdbDictionary::Column::Float: { + float value= (float) *field->ptr; + fprintf(DBUG_FILE, "Float\t%f", value); + break; + } + case NdbDictionary::Column::Double: { + double value= (double) *field->ptr; + fprintf(DBUG_FILE, "Double\t%f", value); + break; + } + case NdbDictionary::Column::Decimal: { + char *value= field->ptr; + + fprintf(DBUG_FILE, "Decimal\t'%-*s'", field->pack_length(), value); + break; + } + case NdbDictionary::Column::Char:{ + char buf[field->pack_length()+1]; + char *value= (char *) field->ptr; + snprintf(buf, field->pack_length(), "%s", value); + fprintf(DBUG_FILE, "Char\t'%s'", buf); + break; + } + case NdbDictionary::Column::Varchar: + case NdbDictionary::Column::Binary: + case NdbDictionary::Column::Varbinary: { + char *value= (char *) field->ptr; + fprintf(DBUG_FILE, "'%s'", value); + break; + } + case NdbDictionary::Column::Datetime: { + Uint64 value= (Uint64) *field->ptr; + fprintf(DBUG_FILE, "Datetime\t%llu", value); + break; + } + case NdbDictionary::Column::Timespec: { + Uint64 value= (Uint64) *field->ptr; + fprintf(DBUG_FILE, "Timespec\t%llu", value); + break; + } + } + fprintf(DBUG_FILE, "\n"); + + } +#endif + DBUG_VOID_RETURN; +} + + +int ha_ndbcluster::index_init(uint index) +{ + DBUG_ENTER("index_init"); + DBUG_PRINT("enter", ("index: %u", index)); + DBUG_RETURN(handler::index_init(index)); +} + + +int ha_ndbcluster::index_end() +{ + DBUG_ENTER("index_end"); + DBUG_RETURN(rnd_end()); +} + + +int ha_ndbcluster::index_read(byte *buf, + const byte *key, uint key_len, + enum ha_rkey_function find_flag) +{ + DBUG_ENTER("index_read"); + DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", + active_index, key_len, find_flag)); + + int error= 1; + statistic_increment(ha_read_key_count, &LOCK_status); + + switch (get_index_type(active_index)){ + case PRIMARY_KEY_INDEX: + error= pk_read(key, key_len, buf); + break; + + case UNIQUE_INDEX: + error= unique_index_read(key, key_len, buf); + break; + + case ORDERED_INDEX: + error= ordered_index_scan(key, key_len, buf, find_flag); + break; + + default: + case UNDEFINED_INDEX: + break; + } + DBUG_RETURN(error); +} + + +int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, + const byte *key, uint key_len, + enum ha_rkey_function find_flag) +{ + statistic_increment(ha_read_key_count,&LOCK_status); + DBUG_ENTER("index_read_idx"); + DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len)); + index_init(index_no); + DBUG_RETURN(index_read(buf, key, key_len, find_flag)); +} + + +int ha_ndbcluster::index_next(byte *buf) +{ + DBUG_ENTER("index_next"); + + int error = 1; + statistic_increment(ha_read_next_count,&LOCK_status); + if (get_index_type(active_index) == PRIMARY_KEY_INDEX) + error= HA_ERR_END_OF_FILE; + else + error = next_result(buf); + DBUG_RETURN(error); +} + + +int ha_ndbcluster::index_prev(byte *buf) +{ + DBUG_ENTER("index_prev"); + statistic_increment(ha_read_prev_count,&LOCK_status); + DBUG_RETURN(1); +} + + +int ha_ndbcluster::index_first(byte *buf) +{ + DBUG_ENTER("index_first"); + statistic_increment(ha_read_first_count,&LOCK_status); + DBUG_RETURN(1); +} + + +int ha_ndbcluster::index_last(byte *buf) +{ + DBUG_ENTER("index_last"); + statistic_increment(ha_read_last_count,&LOCK_status); + DBUG_RETURN(1); +} + + +int ha_ndbcluster::rnd_init(bool scan) +{ + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("rnd_init"); + DBUG_PRINT("enter", ("scan: %d", scan)); + // Check that cursor is not defined + if (cursor) + DBUG_RETURN(1); + index_init(table->primary_key); + DBUG_RETURN(0); +} + + +int ha_ndbcluster::rnd_end() +{ + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("rnd_end"); + + if (cursor) + { + DBUG_PRINT("info", ("Closing the cursor")); + cursor->close(); + m_active_cursor= NULL; + } + DBUG_RETURN(0); +} + + +int ha_ndbcluster::rnd_next(byte *buf) +{ + DBUG_ENTER("rnd_next"); + statistic_increment(ha_read_rnd_next_count, &LOCK_status); + int error = 1; + if (!m_active_cursor) + error = full_table_scan(buf); + else + error = next_result(buf); + DBUG_RETURN(error); +} + + +/* + An "interesting" record has been found and it's pk + retrieved by calling position + Now it's time to read the record from db once + again +*/ + +int ha_ndbcluster::rnd_pos(byte *buf, byte *pos) +{ + DBUG_ENTER("rnd_pos"); + statistic_increment(ha_read_rnd_count,&LOCK_status); + // The primary key for the record is stored in pos + // Perform a pk_read using primary key "index" + DBUG_RETURN(pk_read(pos, ref_length, buf)); +} + + +/* + Store the primary key of this record in ref + variable, so that the row can be retrieved again later + using "reference" in rnd_pos +*/ + +void ha_ndbcluster::position(const byte *record) +{ + KEY *key_info; + KEY_PART_INFO *key_part; + KEY_PART_INFO *end; + byte *buff; + DBUG_ENTER("position"); + + if (table->primary_key != MAX_KEY) + { + key_info= table->key_info + table->primary_key; + key_part= key_info->key_part; + end= key_part + key_info->key_parts; + buff= ref; + + for (; key_part != end; key_part++) + { + if (key_part->null_bit) { + /* Store 0 if the key part is a NULL part */ + if (record[key_part->null_offset] + & key_part->null_bit) { + *buff++= 1; + continue; + } + *buff++= 0; + } + memcpy(buff, record + key_part->offset, key_part->length); + buff += key_part->length; + } + } + else + { + // No primary key, get hidden key + DBUG_PRINT("info", ("Getting hidden key")); + int hidden_no= table->fields; + NdbRecAttr* rec= m_value[hidden_no]; + const NDBTAB *tab= (NDBTAB *) m_table; + const NDBCOL *hidden_col= tab->getColumn(hidden_no); + DBUG_ASSERT(hidden_col->getPrimaryKey() && + hidden_col->getAutoIncrement() && + rec != NULL && + ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH); + memcpy(ref, (const void*)rec->aRef(), ref_length); + } + + DBUG_DUMP("ref", (char*)ref, ref_length); + DBUG_VOID_RETURN; +} + + +void ha_ndbcluster::info(uint flag) +{ + DBUG_ENTER("info"); + DBUG_PRINT("enter", ("flag: %d", flag)); + + if (flag & HA_STATUS_POS) + DBUG_PRINT("info", ("HA_STATUS_POS")); + if (flag & HA_STATUS_NO_LOCK) + DBUG_PRINT("info", ("HA_STATUS_NO_LOCK")); + if (flag & HA_STATUS_TIME) + DBUG_PRINT("info", ("HA_STATUS_TIME")); + if (flag & HA_STATUS_CONST) + DBUG_PRINT("info", ("HA_STATUS_CONST")); + if (flag & HA_STATUS_VARIABLE) + DBUG_PRINT("info", ("HA_STATUS_VARIABLE")); + if (flag & HA_STATUS_ERRKEY) + DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); + if (flag & HA_STATUS_AUTO) + DBUG_PRINT("info", ("HA_STATUS_AUTO")); + DBUG_VOID_RETURN; +} + + +int ha_ndbcluster::extra(enum ha_extra_function operation) +{ + DBUG_ENTER("extra"); + switch (operation) { + case HA_EXTRA_NORMAL: /* Optimize for space (def) */ + DBUG_PRINT("info", ("HA_EXTRA_NORMAL")); + break; + case HA_EXTRA_QUICK: /* Optimize for speed */ + DBUG_PRINT("info", ("HA_EXTRA_QUICK")); + break; + case HA_EXTRA_RESET: /* Reset database to after open */ + DBUG_PRINT("info", ("HA_EXTRA_RESET")); + break; + case HA_EXTRA_CACHE: /* Cash record in HA_rrnd() */ + DBUG_PRINT("info", ("HA_EXTRA_CACHE")); + break; + case HA_EXTRA_NO_CACHE: /* End cacheing of records (def) */ + DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE")); + break; + case HA_EXTRA_NO_READCHECK: /* No readcheck on update */ + DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK")); + break; + case HA_EXTRA_READCHECK: /* Use readcheck (def) */ + DBUG_PRINT("info", ("HA_EXTRA_READCHECK")); + break; + case HA_EXTRA_KEYREAD: /* Read only key to database */ + DBUG_PRINT("info", ("HA_EXTRA_KEYREAD")); + break; + case HA_EXTRA_NO_KEYREAD: /* Normal read of records (def) */ + DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD")); + break; + case HA_EXTRA_NO_USER_CHANGE: /* No user is allowed to write */ + DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE")); + break; + case HA_EXTRA_KEY_CACHE: + DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE")); + break; + case HA_EXTRA_NO_KEY_CACHE: + DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE")); + break; + case HA_EXTRA_WAIT_LOCK: /* Wait until file is avalably (def) */ + DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK")); + break; + case HA_EXTRA_NO_WAIT_LOCK: /* If file is locked, return quickly */ + DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK")); + break; + case HA_EXTRA_WRITE_CACHE: /* Use write cache in ha_write() */ + DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE")); + break; + case HA_EXTRA_FLUSH_CACHE: /* flush write_record_cache */ + DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE")); + break; + case HA_EXTRA_NO_KEYS: /* Remove all update of keys */ + DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS")); + break; + case HA_EXTRA_KEYREAD_CHANGE_POS: /* Keyread, but change pos */ + DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */ + break; + case HA_EXTRA_REMEMBER_POS: /* Remember pos for next/prev */ + DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS")); + break; + case HA_EXTRA_RESTORE_POS: + DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS")); + break; + case HA_EXTRA_REINIT_CACHE: /* init cache from current record */ + DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE")); + break; + case HA_EXTRA_FORCE_REOPEN: /* Datafile have changed on disk */ + DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN")); + break; + case HA_EXTRA_FLUSH: /* Flush tables to disk */ + DBUG_PRINT("info", ("HA_EXTRA_FLUSH")); + break; + case HA_EXTRA_NO_ROWS: /* Don't write rows */ + DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS")); + break; + case HA_EXTRA_RESET_STATE: /* Reset positions */ + DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE")); + break; + case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/ + DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY")); + + DBUG_PRINT("info", ("Turning ON use of write instead of insert")); + m_use_write= TRUE; + break; + case HA_EXTRA_NO_IGNORE_DUP_KEY: + DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY")); + DBUG_PRINT("info", ("Turning OFF use of write instead of insert")); + m_use_write= false; + break; + case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those + where field->query_id is the same as + the current query id */ + DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS")); + break; + case HA_EXTRA_PREPARE_FOR_DELETE: + DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE")); + break; + case HA_EXTRA_PREPARE_FOR_UPDATE: /* Remove read cache if problems */ + DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE")); + break; + case HA_EXTRA_PRELOAD_BUFFER_SIZE: + DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE")); + break; + case HA_EXTRA_RETRIEVE_PRIMARY_KEY: + DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY")); + break; + case HA_EXTRA_CHANGE_KEY_TO_UNIQUE: + DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE")); + break; + case HA_EXTRA_CHANGE_KEY_TO_DUP: + DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP")); + break; + + } + + DBUG_RETURN(0); +} + + +int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) +{ + DBUG_ENTER("extra_opt"); + DBUG_PRINT("enter", ("cache_size: %d", cache_size)); + DBUG_RETURN(extra(operation)); +} + + +int ha_ndbcluster::reset() +{ + DBUG_ENTER("reset"); + // Reset what? + DBUG_RETURN(1); +} + + +const char **ha_ndbcluster::bas_ext() const +{ static const char *ext[1] = { NullS }; return ext; } + + +/* + How many seeks it will take to read through the table + This is to be comparable to the number returned by records_in_range so + that we can decide if we should scan the table or use keys. +*/ + +double ha_ndbcluster::scan_time() +{ + return rows2double(records/3); +} + + +THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + DBUG_ENTER("store_lock"); + + if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK) + { + + /* If we are not doing a LOCK TABLE, then allow multiple + writers */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && !thd->in_lock_tables) + lock_type= TL_WRITE_ALLOW_WRITE; + + /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. */ + + if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) + lock_type= TL_READ; + + m_lock.type=lock_type; + } + *to++= &m_lock; + + DBUG_RETURN(to); +} + +#ifndef DBUG_OFF +#define PRINT_OPTION_FLAGS(t) { \ + if (t->options & OPTION_NOT_AUTOCOMMIT) \ + DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \ + if (t->options & OPTION_BEGIN) \ + DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \ + if (t->options & OPTION_TABLE_LOCK) \ + DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \ +} +#else +#define PRINT_OPTION_FLAGS(t) +#endif + + +/* + As MySQL will execute an external lock for every new table it uses + we can use this to start the transactions. + If we are in auto_commit mode we just need to start a transaction + for the statement, this will be stored in transaction.stmt. + If not, we have to start a master transaction if there doesn't exist + one from before, this will be stored in transaction.all + + When a table lock is held one transaction will be started which holds + the table lock and for each statement a hupp transaction will be started + */ + +int ha_ndbcluster::external_lock(THD *thd, int lock_type) +{ + int error=0; + NdbConnection* trans= NULL; + + DBUG_ENTER("external_lock"); + DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d", + thd->transaction.ndb_lock_count)); + + /* + Check that this handler instance has a connection + set up to the Ndb object of thd + */ + if (check_ndb_connection()) + DBUG_RETURN(1); + + if (lock_type != F_UNLCK) + { + if (!thd->transaction.ndb_lock_count++) + { + PRINT_OPTION_FLAGS(thd); + + if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK))) + { + // Autocommit transaction + DBUG_ASSERT(!thd->transaction.stmt.ndb_tid); + DBUG_PRINT("trans",("Starting transaction stmt")); + + trans= m_ndb->startTransaction(); + if (trans == NULL) + { + thd->transaction.ndb_lock_count--; // We didn't get the lock + ERR_RETURN(m_ndb->getNdbError()); + } + thd->transaction.stmt.ndb_tid= trans; + } + else + { + if (!thd->transaction.all.ndb_tid) + { + // Not autocommit transaction + // A "master" transaction ha not been started yet + DBUG_PRINT("trans",("starting transaction, all")); + + trans= m_ndb->startTransaction(); + if (trans == NULL) + { + thd->transaction.ndb_lock_count--; // We didn't get the lock + ERR_RETURN(m_ndb->getNdbError()); + } + + /* + If this is the start of a LOCK TABLE, a table look + should be taken on the table in NDB + + Check if it should be read or write lock + */ + if (thd->options & (OPTION_TABLE_LOCK)) + { + //lockThisTable(); + DBUG_PRINT("info", ("Locking the table..." )); + } + + thd->transaction.all.ndb_tid= trans; + } + } + } + /* + This is the place to make sure this handler instance + has a started transaction. + + The transaction is started by the first handler on which + MySQL Server calls external lock + + Other handlers in the same stmt or transaction should use + the same NDB transaction. This is done by setting up the m_active_trans + pointer to point to the NDB transaction. + */ + + m_active_trans= thd->transaction.all.ndb_tid ? + (NdbConnection*)thd->transaction.all.ndb_tid: + (NdbConnection*)thd->transaction.stmt.ndb_tid; + DBUG_ASSERT(m_active_trans); + + } + else + { + if (!--thd->transaction.ndb_lock_count) + { + DBUG_PRINT("trans", ("Last external_lock")); + PRINT_OPTION_FLAGS(thd); + + if (thd->transaction.stmt.ndb_tid) + { + /* + Unlock is done without a transaction commit / rollback. + This happens if the thread didn't update any rows + We must in this case close the transaction to release resources + */ + DBUG_PRINT("trans",("ending non-updating transaction")); + m_ndb->closeTransaction(m_active_trans); + thd->transaction.stmt.ndb_tid= 0; + } + } + m_active_trans= NULL; + } + DBUG_RETURN(error); +} + +/* + When using LOCK TABLE's external_lock is only called when the actual + TABLE LOCK is done. + Under LOCK TABLES, each used tables will force a call to start_stmt. +*/ + +int ha_ndbcluster::start_stmt(THD *thd) +{ + int error=0; + DBUG_ENTER("start_stmt"); + PRINT_OPTION_FLAGS(thd); + + NdbConnection *trans= (NdbConnection*)thd->transaction.stmt.ndb_tid; + if (!trans){ + DBUG_PRINT("trans",("Starting transaction stmt")); + + NdbConnection *tablock_trans= + (NdbConnection*)thd->transaction.all.ndb_tid; + DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans)); + DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); + if (trans == NULL) + ERR_RETURN(m_ndb->getNdbError()); + thd->transaction.stmt.ndb_tid= trans; + } + m_active_trans= trans; + + DBUG_RETURN(error); +} + + +/* + Commit a transaction started in NDB + */ + +int ndbcluster_commit(THD *thd, void *ndb_transaction) +{ + int res= 0; + Ndb *ndb= (Ndb*)thd->transaction.ndb; + NdbConnection *trans= (NdbConnection*)ndb_transaction; + + DBUG_ENTER("ndbcluster_commit"); + DBUG_PRINT("transaction",("%s", + trans == thd->transaction.stmt.ndb_tid ? + "stmt" : "all")); + DBUG_ASSERT(ndb && trans); + + if (trans->execute(Commit) != 0) + { + const NdbError err= trans->getNdbError(); + ERR_PRINT(err); + res= ndb_to_mysql_error(&err); + } + ndb->closeTransaction(trans); + DBUG_RETURN(res); +} + + +/* + Rollback a transaction started in NDB + */ + +int ndbcluster_rollback(THD *thd, void *ndb_transaction) +{ + int res= 0; + Ndb *ndb= (Ndb*)thd->transaction.ndb; + NdbConnection *trans= (NdbConnection*)ndb_transaction; + + DBUG_ENTER("ndbcluster_rollback"); + DBUG_PRINT("transaction",("%s", + trans == thd->transaction.stmt.ndb_tid ? + "stmt" : "all")); + DBUG_ASSERT(ndb && trans); + + if (trans->execute(Rollback) != 0) + { + const NdbError err= trans->getNdbError(); + ERR_PRINT(err); + res= ndb_to_mysql_error(&err); + } + ndb->closeTransaction(trans); + DBUG_RETURN(0); +} + + +/* + Map MySQL type to the corresponding NDB type + */ + +inline NdbDictionary::Column::Type +mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg) +{ + switch(mysql_type) { + case MYSQL_TYPE_DECIMAL: + return NdbDictionary::Column::Char; + case MYSQL_TYPE_TINY: + return (unsigned_flg) ? + NdbDictionary::Column::Tinyunsigned : + NdbDictionary::Column::Tinyint; + case MYSQL_TYPE_SHORT: + return (unsigned_flg) ? + NdbDictionary::Column::Smallunsigned : + NdbDictionary::Column::Smallint; + case MYSQL_TYPE_LONG: + return (unsigned_flg) ? + NdbDictionary::Column::Unsigned : + NdbDictionary::Column::Int; + case MYSQL_TYPE_TIMESTAMP: + return NdbDictionary::Column::Unsigned; + case MYSQL_TYPE_LONGLONG: + return (unsigned_flg) ? + NdbDictionary::Column::Bigunsigned : + NdbDictionary::Column::Bigint; + case MYSQL_TYPE_INT24: + return (unsigned_flg) ? + NdbDictionary::Column::Mediumunsigned : + NdbDictionary::Column::Mediumint; + break; + case MYSQL_TYPE_FLOAT: + return NdbDictionary::Column::Float; + case MYSQL_TYPE_DOUBLE: + return NdbDictionary::Column::Double; + case MYSQL_TYPE_DATETIME : + return NdbDictionary::Column::Datetime; + case MYSQL_TYPE_DATE : + case MYSQL_TYPE_NEWDATE : + case MYSQL_TYPE_TIME : + case MYSQL_TYPE_YEAR : + // Missing NDB data types, mapped to char + return NdbDictionary::Column::Char; + case MYSQL_TYPE_ENUM : + return NdbDictionary::Column::Char; + case MYSQL_TYPE_SET : + return NdbDictionary::Column::Char; + case MYSQL_TYPE_TINY_BLOB : + case MYSQL_TYPE_MEDIUM_BLOB : + case MYSQL_TYPE_LONG_BLOB : + case MYSQL_TYPE_BLOB : + return NdbDictionary::Column::Blob; + case MYSQL_TYPE_VAR_STRING : + return NdbDictionary::Column::Varchar; + case MYSQL_TYPE_STRING : + return NdbDictionary::Column::Char; + case MYSQL_TYPE_NULL : + case MYSQL_TYPE_GEOMETRY : + return NdbDictionary::Column::Undefined; + } + return NdbDictionary::Column::Undefined; +} + + +/* + Create a table in NDB Cluster + */ + +int ha_ndbcluster::create(const char *name, + TABLE *form, + HA_CREATE_INFO *info) +{ + NDBTAB tab; + NdbDictionary::Column::Type ndb_type; + NDBCOL col; + uint pack_length, length, i; + int res; + const void *data, *pack_data; + const char **key_name= form->keynames.type_names; + char name2[FN_HEADLEN]; + + DBUG_ENTER("create"); + DBUG_PRINT("enter", ("name: %s", name)); + fn_format(name2, name, "", "",2); // Remove the .frm extension + set_dbname(name2); + set_tabname(name2); + + DBUG_PRINT("table", ("name: %s", m_tabname)); + tab.setName(m_tabname); + tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); + + // Save frm data for this table + if (readfrm(name, &data, &length)) + DBUG_RETURN(1); + if (packfrm(data, length, &pack_data, &pack_length)) + DBUG_RETURN(2); + + DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length)); + tab.setFrm(pack_data, pack_length); + my_free((char*)data, MYF(0)); + my_free((char*)pack_data, MYF(0)); + + for (i= 0; i < form->fields; i++) + { + Field *field= form->field[i]; + ndb_type= mysql_to_ndb_type(field->real_type(), + field->flags & UNSIGNED_FLAG); + DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", + field->field_name, field->real_type(), + field->pack_length())); + col.setName(field->field_name); + col.setType(ndb_type); + if ((ndb_type == NdbDictionary::Column::Char) || + (ndb_type == NdbDictionary::Column::Varchar)) + col.setLength(field->pack_length()); + else + col.setLength(1); + col.setNullable(field->maybe_null()); + col.setPrimaryKey(field->flags & PRI_KEY_FLAG); + if (field->flags & AUTO_INCREMENT_FLAG) + { + DBUG_PRINT("info", ("Found auto_increment key")); + col.setAutoIncrement(TRUE); + ulonglong value = info->auto_increment_value ? + info->auto_increment_value -1 : + (ulonglong) 0; + DBUG_PRINT("info", ("initial value=%ld", value)); +// col.setInitialAutIncValue(value); + } + else + col.setAutoIncrement(false); + + tab.addColumn(col); + } + + // No primary key, create shadow key as 64 bit, auto increment + if (form->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Generating shadow key")); + col.setName("$PK"); + col.setType(NdbDictionary::Column::Bigunsigned); + col.setLength(1); + col.setNullable(false); + col.setPrimaryKey(TRUE); + col.setAutoIncrement(TRUE); + tab.addColumn(col); + } + + my_errno= 0; + if (check_ndb_connection()) + { + my_errno= HA_ERR_NO_CONNECTION; + DBUG_RETURN(my_errno); + } + + // Create the table in NDB + NDBDICT *dict= m_ndb->getDictionary(); + if (dict->createTable(tab)) + { + const NdbError err= dict->getNdbError(); + ERR_PRINT(err); + my_errno= ndb_to_mysql_error(&err); + DBUG_RETURN(my_errno); + } + DBUG_PRINT("info", ("Table %s/%s created successfully", + m_dbname, m_tabname)); + + // Fetch table from NDB, check that it exists + const NDBTAB *tab2= dict->getTable(m_tabname); + if (tab2 == NULL) + { + const NdbError err= dict->getNdbError(); + ERR_PRINT(err); + my_errno= ndb_to_mysql_error(&err); + DBUG_RETURN(my_errno); + } + + // Create secondary indexes + for (i= 0; i < form->keys; i++) + { + DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i])); + if (i == form->primary_key) + { + DBUG_PRINT("info", ("Skipping it, PK already created")); + continue; + } + + DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i])); + res= create_index(key_name[i], + form->key_info + i); + switch(res){ + case 0: + // OK + break; + default: + DBUG_PRINT("error", ("Failed to create index %u", i)); + drop_table(); + my_errno= res; + goto err_end; + } + } + +err_end: + DBUG_RETURN(my_errno); +} + + +/* + Create an index in NDB Cluster + */ + +int ha_ndbcluster::create_index(const char *name, + KEY *key_info){ + NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); + KEY_PART_INFO *key_part= key_info->key_part; + KEY_PART_INFO *end= key_part + key_info->key_parts; + + DBUG_ENTER("create_index"); + DBUG_PRINT("enter", ("name: %s ", name)); + + // Check that an index with the same name do not already exist + if (dict->getIndex(name, m_tabname)) + ERR_RETURN(dict->getNdbError()); + + NdbDictionary::Index ndb_index(name); + if (key_info->flags & HA_NOSAME) + ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); + else + { + ndb_index.setType(NdbDictionary::Index::OrderedIndex); + // TODO Only temporary ordered indexes supported + ndb_index.setLogging(false); + } + ndb_index.setTable(m_tabname); + + for (; key_part != end; key_part++) + { + Field *field= key_part->field; + DBUG_PRINT("info", ("attr: %s", field->field_name)); + ndb_index.addColumnName(field->field_name); + } + + if (dict->createIndex(ndb_index)) + ERR_RETURN(dict->getNdbError()); + + // Success + DBUG_PRINT("info", ("Created index %s", name)); + DBUG_RETURN(0); +} + + +/* + Rename a table in NDB Cluster +*/ + +int ha_ndbcluster::rename_table(const char *from, const char *to) +{ + char new_tabname[FN_HEADLEN]; + + DBUG_ENTER("ha_ndbcluster::rename_table"); + set_dbname(from); + set_tabname(from); + set_tabname(to, new_tabname); + + if (check_ndb_connection()) { + my_errno= HA_ERR_NO_CONNECTION; + DBUG_RETURN(my_errno); + } + + int result= alter_table_name(m_tabname, new_tabname); + if (result == 0) + set_tabname(to); + + DBUG_RETURN(result); +} + + +/* + Rename a table in NDB Cluster using alter table + */ + +int ha_ndbcluster::alter_table_name(const char *from, const char *to) +{ + NDBDICT *dict= m_ndb->getDictionary(); + const NDBTAB *orig_tab; + DBUG_ENTER("alter_table_name_table"); + DBUG_PRINT("enter", ("Renaming %s to %s", from, to)); + + if (!(orig_tab= dict->getTable(from))) + ERR_RETURN(dict->getNdbError()); + + NdbDictionary::Table copy_tab= dict->getTableForAlteration(from); + copy_tab.setName(to); + if (dict->alterTable(copy_tab) != 0) + ERR_RETURN(dict->getNdbError()); + + m_table= NULL; + + DBUG_RETURN(0); +} + + +/* + Delete a table from NDB Cluster + */ + +int ha_ndbcluster::delete_table(const char *name) +{ + DBUG_ENTER("delete_table"); + DBUG_PRINT("enter", ("name: %s", name)); + set_dbname(name); + set_tabname(name); + + if (check_ndb_connection()) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + DBUG_RETURN(drop_table()); +} + + +/* + Drop a table in NDB Cluster + */ + +int ha_ndbcluster::drop_table() +{ + NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); + + DBUG_ENTER("drop_table"); + DBUG_PRINT("enter", ("Deleting %s", m_tabname)); + + if (dict->dropTable(m_tabname)) + { + const NdbError err= dict->getNdbError(); + if (err.code == 709) + ; // 709: No such table existed + else + ERR_RETURN(dict->getNdbError()); + } + release_metadata(); + DBUG_RETURN(0); +} + + +/* + Drop a database in NDB Cluster + */ + +int ndbcluster_drop_database(const char *path) +{ + DBUG_ENTER("ndbcluster_drop_database"); + // TODO drop all tables for this database + DBUG_RETURN(1); +} + + +longlong ha_ndbcluster::get_auto_increment() +{ + // NOTE If number of values to be inserted is known + // the autoincrement cache could be used here + Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); + return (longlong)auto_value; +} + + +/* + Constructor for the NDB Cluster table handler + */ + +ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): + handler(table_arg), + m_active_trans(NULL), + m_active_cursor(NULL), + m_ndb(NULL), + m_table(NULL), + m_table_flags(HA_REC_NOT_IN_SEQ | + HA_KEYPOS_TO_RNDPOS | + HA_NOT_EXACT_COUNT | + HA_NO_WRITE_DELAYED | + HA_NO_PREFIX_CHAR_KEYS | + HA_NO_BLOBS | + HA_DROP_BEFORE_CREATE | + HA_NOT_READ_AFTER_KEY), + m_use_write(false) +{ + + DBUG_ENTER("ha_ndbcluster"); + + m_tabname[0]= '\0'; + m_dbname[0]= '\0'; + + // TODO Adjust number of records and other parameters for proper + // selection of scan/pk access + records= 100; + block_size= 1024; + + DBUG_VOID_RETURN; +} + + +/* + Destructor for NDB Cluster table handler + */ + +ha_ndbcluster::~ha_ndbcluster() +{ + DBUG_ENTER("~ha_ndbcluster"); + + release_metadata(); + + // Check for open cursor/transaction + DBUG_ASSERT(m_active_cursor == NULL); + DBUG_ASSERT(m_active_trans == NULL); + + DBUG_VOID_RETURN; +} + + +/* + Open a table for further use + - fetch metadata for this table from NDB + - check that table exists +*/ + +int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) +{ + KEY *key; + DBUG_ENTER("open"); + DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d", + name, mode, test_if_locked)); + + // Setup ref_length to make room for the whole + // primary key to be written in the ref variable + + if (table->primary_key != MAX_KEY) + { + key= table->key_info+table->primary_key; + ref_length= key->key_length; + DBUG_PRINT("info", (" ref_length: %d", ref_length)); + } + // Init table lock structure + if (!(m_share=get_share(name))) + DBUG_RETURN(1); + thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0); + + set_dbname(name); + set_tabname(name); + + if (check_ndb_connection()) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + + DBUG_RETURN(get_metadata(name)); +} + + +/* + Close the table + - release resources setup by open() + */ + +int ha_ndbcluster::close(void) +{ + DBUG_ENTER("close"); + free_share(m_share); + release_metadata(); + m_ndb= NULL; + DBUG_RETURN(0); +} + + +Ndb* ha_ndbcluster::seize_ndb() +{ + Ndb* ndb; + DBUG_ENTER("seize_ndb"); + +#ifdef USE_NDB_POOL + // Seize from pool + ndb= Ndb::seize(); +#else + ndb= new Ndb(""); +#endif + if (ndb->init(NDB_MAX_TRANSACTIONS) != 0) + { + ERR_PRINT(ndb->getNdbError()); + /* + TODO + Alt.1 If init fails because to many allocated Ndb + wait on condition for a Ndb object to be released. + Alt.2 Seize/release from pool, wait until next release + */ + delete ndb; + ndb= NULL; + } + DBUG_RETURN(ndb); +} + + +void ha_ndbcluster::release_ndb(Ndb* ndb) +{ + DBUG_ENTER("release_ndb"); +#ifdef USE_NDB_POOL + // Release to pool + Ndb::release(ndb); +#else + delete ndb; +#endif + DBUG_VOID_RETURN; +} + + +/* + If this thread already has a Ndb object allocated + in current THD, reuse it. Otherwise + seize a Ndb object, assign it to current THD and use it. + + Having a Ndb object also means that a connection to + NDB cluster has been opened. The connection is + checked. + +*/ + +int ha_ndbcluster::check_ndb_connection() +{ + THD* thd= current_thd; + Ndb* ndb; + DBUG_ENTER("check_ndb_connection"); + + if (!thd->transaction.ndb) + { + ndb= seize_ndb(); + if (!ndb) + DBUG_RETURN(2); + thd->transaction.ndb= ndb; + } + m_ndb= (Ndb*)thd->transaction.ndb; + m_ndb->setDatabaseName(m_dbname); + if (m_ndb->waitUntilReady() != 0) + { + DBUG_PRINT("error", ("Ndb was not ready")); + DBUG_RETURN(3); + } + DBUG_RETURN(0); +} + +void ndbcluster_close_connection(THD *thd) +{ + Ndb* ndb; + DBUG_ENTER("ndbcluster_close_connection"); + ndb= (Ndb*)thd->transaction.ndb; + ha_ndbcluster::release_ndb(ndb); + thd->transaction.ndb= NULL; + DBUG_VOID_RETURN; +} + + +/* + Try to discover one table from NDB + */ + +int ndbcluster_discover(const char *dbname, const char *name, + const void** frmblob, uint* frmlen) +{ + uint len; + const void* data; + const NDBTAB* tab; + DBUG_ENTER("ndbcluster_discover"); + DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name)); + + Ndb ndb(dbname); + if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0)) + ERR_RETURN(ndb.getNdbError()); + + if (!(tab= ndb.getDictionary()->getTable(name))) + { + DBUG_PRINT("info", ("Table %s not found", name)); + DBUG_RETURN(1); + } + + DBUG_PRINT("info", ("Found table %s", tab->getName())); + + len= tab->getFrmLength(); + if (len == 0 || tab->getFrmData() == NULL) + { + DBUG_PRINT("No frm data found", + ("Table is probably created via NdbApi")); + DBUG_RETURN(2); + } + + if (unpackfrm(&data, &len, tab->getFrmData())) + DBUG_RETURN(3); + + *frmlen= len; + *frmblob= data; + + DBUG_RETURN(0); +} + +static Ndb* g_ndb= NULL; + +#ifdef USE_DISCOVER_ON_STARTUP +/* + Dicover tables from NDB Cluster + - fetch a list of tables from NDB + - store the frm file for each table on disk + - if the table has an attached frm file + - if the database of the table exists +*/ + +int ndb_discover_tables() +{ + uint i; + NdbDictionary::Dictionary::List list; + NdbDictionary::Dictionary* dict; + char path[FN_REFLEN]; + DBUG_ENTER("ndb_discover_tables"); + + /* List tables in NDB Cluster kernel */ + dict= g_ndb->getDictionary(); + if (dict->listObjects(list, + NdbDictionary::Object::UserTable) != 0) + ERR_RETURN(g_ndb->getNdbError()); + + for (i= 0 ; i < list.count ; i++) + { + NdbDictionary::Dictionary::List::Element& t= list.elements[i]; + + DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name)); + if (create_table_from_handler(t.database, t.name, true)) + DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name)); + } + DBUG_RETURN(0); +} +#endif + + +/* + Initialise all gloal variables before creating + a NDB Cluster table handler + */ + +bool ndbcluster_init() +{ + DBUG_ENTER("ndbcluster_init"); + // Create a Ndb object to open the connection to NDB + g_ndb= new Ndb("sys"); + if (g_ndb->init() != 0) + { + ERR_PRINT (g_ndb->getNdbError()); + DBUG_RETURN(TRUE); + } + if (g_ndb->waitUntilReady() != 0) + { + ERR_PRINT (g_ndb->getNdbError()); + DBUG_RETURN(TRUE); + } + (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, + (hash_get_key) ndbcluster_get_key,0,0); + pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); + ndbcluster_inited= 1; +#ifdef USE_DISCOVER_ON_STARTUP + if (ndb_discover_tables() != 0) + DBUG_RETURN(TRUE); +#endif + DBUG_RETURN(false); +} + + +/* + End use of the NDB Cluster table handler + - free all global variables allocated by + ndcluster_init() +*/ + +bool ndbcluster_end() +{ + DBUG_ENTER("ndbcluster_end"); + delete g_ndb; + g_ndb= NULL; + if (!ndbcluster_inited) + DBUG_RETURN(0); + hash_free(&ndbcluster_open_tables); +#ifdef USE_NDB_POOL + ndb_pool_release(); +#endif + pthread_mutex_destroy(&ndbcluster_mutex); + ndbcluster_inited= 0; + DBUG_RETURN(0); +} + + +/* + Set m_tabname from full pathname to table file + */ + +void ha_ndbcluster::set_tabname(const char *path_name) +{ + char *end, *ptr; + + /* Scan name from the end */ + end= strend(path_name)-1; + ptr= end; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + uint name_len= end - ptr; + memcpy(m_tabname, ptr + 1, end - ptr); + m_tabname[name_len]= '\0'; +#ifdef __WIN__ + /* Put to lower case */ + ptr= m_tabname; + + while (*ptr != '\0') { + *ptr = tolower(*ptr); + ptr++; + } +#endif +} + +/** + * Set a given location from full pathname to table file + * + */ +void +ha_ndbcluster::set_tabname(const char *path_name, char * tabname) +{ + char *end, *ptr; + + /* Scan name from the end */ + end = strend(path_name)-1; + ptr = end; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + uint name_len = end - ptr; + memcpy(tabname, ptr + 1, end - ptr); + tabname[name_len] = '\0'; +#ifdef __WIN__ + /* Put to lower case */ + ptr = tabname; + + while (*ptr != '\0') { + *ptr= tolower(*ptr); + ptr++; + } +#endif +} + + +/* + Set m_dbname from full pathname to table file + + */ + +void ha_ndbcluster::set_dbname(const char *path_name) +{ + char *end, *ptr; + + /* Scan name from the end */ + ptr= strend(path_name)-1; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + ptr--; + end= ptr; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + uint name_len= end - ptr; + memcpy(m_dbname, ptr + 1, name_len); + m_dbname[name_len]= '\0'; +#ifdef __WIN__ + /* Put to lower case */ + + ptr= m_dbname; + + while (*ptr != '\0') { + *ptr= tolower(*ptr); + ptr++; + } +#endif +} + + +ha_rows +ha_ndbcluster::records_in_range(int inx, + const byte *start_key,uint start_key_len, + enum ha_rkey_function start_search_flag, + const byte *end_key,uint end_key_len, + enum ha_rkey_function end_search_flag) +{ + ha_rows records= 10; + KEY* key_info= table->key_info + inx; + uint key_length= key_info->key_length; + + DBUG_ENTER("records_in_range"); + DBUG_PRINT("enter", ("inx: %d", inx)); + DBUG_PRINT("enter", ("start_key: %x, start_key_len: %d", start_key, start_key_len)); + DBUG_PRINT("enter", ("start_search_flag: %d", start_search_flag)); + DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len)); + DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag)); + + /* + Check that start_key_len is equal to + the length of the used index and + prevent partial scan/read of hash indexes by returning HA_POS_ERROR + */ + NDB_INDEX_TYPE idx_type= get_index_type(inx); + if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) && + start_key_len < key_length) + { + DBUG_PRINT("warning", ("Tried to use index which required" + "full key length: %d, HA_POS_ERROR", + key_length)); + records= HA_POS_ERROR; + } + DBUG_RETURN(records); +} + + +/* + Handling the shared NDB_SHARE structure that is needed to + provide table locking. + It's also used for sharing data with other NDB handlers + in the same MySQL Server. There is currently not much + data we want to or can share. + */ + +static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length, + my_bool not_used __attribute__((unused))) +{ + *length=share->table_name_length; + return (byte*) share->table_name; +} + +static NDB_SHARE* get_share(const char *table_name) +{ + NDB_SHARE *share; + pthread_mutex_lock(&ndbcluster_mutex); + uint length=(uint) strlen(table_name); + if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) table_name, + length))) + { + if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1, + MYF(MY_WME | MY_ZEROFILL)))) + { + share->table_name_length=length; + share->table_name=(char*) (share+1); + strmov(share->table_name,table_name); + if (my_hash_insert(&ndbcluster_open_tables, (byte*) share)) + { + pthread_mutex_unlock(&ndbcluster_mutex); + my_free((gptr) share,0); + return 0; + } + thr_lock_init(&share->lock); + pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); + } + } + share->use_count++; + pthread_mutex_unlock(&ndbcluster_mutex); + return share; +} + + +static void free_share(NDB_SHARE *share) +{ + pthread_mutex_lock(&ndbcluster_mutex); + if (!--share->use_count) + { + hash_delete(&ndbcluster_open_tables, (byte*) share); + thr_lock_delete(&share->lock); + pthread_mutex_destroy(&share->mutex); + my_free((gptr) share, MYF(0)); + } + pthread_mutex_unlock(&ndbcluster_mutex); +} + + + +/* + Internal representation of the frm blob + +*/ + +struct frm_blob_struct +{ + struct frm_blob_header + { + uint ver; // Version of header + uint orglen; // Original length of compressed data + uint complen; // Compressed length of data, 0=uncompressed + } head; + char data[1]; +}; + + + +static int packfrm(const void *data, uint len, + const void **pack_data, uint *pack_len) +{ + int error; + ulong org_len, comp_len; + uint blob_len; + frm_blob_struct* blob; + DBUG_ENTER("packfrm"); + DBUG_PRINT("enter", ("data: %x, len: %d", data, len)); + + error= 1; + org_len = len; + if (my_compress((byte*)data, &org_len, &comp_len)) + goto err; + + DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len)); + DBUG_DUMP("compressed", (char*)data, org_len); + + error= 2; + blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len; + if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME)))) + goto err; + + // Store compressed blob in machine independent format + int4store((char*)(&blob->head.ver), 1); + int4store((char*)(&blob->head.orglen), comp_len); + int4store((char*)(&blob->head.complen), org_len); + + // Copy frm data into blob, already in machine independent format + memcpy(blob->data, data, org_len); + + *pack_data = blob; + *pack_len = blob_len; + error = 0; + + DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len)); +err: + DBUG_RETURN(error); + +} + + +static int unpackfrm(const void **unpack_data, uint *unpack_len, + const void *pack_data) +{ + const frm_blob_struct *blob = (frm_blob_struct*)pack_data; + byte *data; + ulong complen, orglen, ver; + DBUG_ENTER("unpackfrm"); + DBUG_PRINT("enter", ("pack_data: %x", pack_data)); + + complen= uint4korr((char*)&blob->head.complen); + orglen= uint4korr((char*)&blob->head.orglen); + ver= uint4korr((char*)&blob->head.ver); + + DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d", + ver,complen,orglen)); + DBUG_DUMP("blob->data", (char*) blob->data, complen); + + if (ver != 1) + DBUG_RETURN(1); + if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME)))) + DBUG_RETURN(2); + memcpy(data, blob->data, complen); + + if (my_uncompress(data, &complen, &orglen)) + { + my_free((char*)data, MYF(0)); + DBUG_RETURN(3); + } + + *unpack_data = data; + *unpack_len = complen; + + DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len)); + + DBUG_RETURN(0); +} +#endif /* HAVE_NDBCLUSTER_DB */ diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h new file mode 100755 index 00000000000..ed66d07d79b --- /dev/null +++ b/sql/ha_ndbcluster.h @@ -0,0 +1,218 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + This file defines the NDB Cluster handler: the interface between MySQL and + NDB Cluster +*/ + +/* The class defining a handle to an NDB Cluster table */ + +#ifdef __GNUC__ +#pragma interface /* gcc class implementation */ +#endif + +#include <ndbapi_limits.h> +#include <ndb_types.h> + +class Ndb; // Forward declaration +class NdbOperation; // Forward declaration +class NdbConnection; // Forward declaration +class NdbRecAttr; // Forward declaration +class NdbResultSet; // Forward declaration + +typedef enum ndb_index_type { + UNDEFINED_INDEX = 0, + PRIMARY_KEY_INDEX = 1, + UNIQUE_INDEX = 2, + ORDERED_INDEX = 3 +} NDB_INDEX_TYPE; + + +typedef struct st_ndbcluster_share { + THR_LOCK lock; + pthread_mutex_t mutex; + char *table_name; + uint table_name_length,use_count; +} NDB_SHARE; + +class ha_ndbcluster: public handler +{ + public: + ha_ndbcluster(TABLE *table); + ~ha_ndbcluster(); + + int open(const char *name, int mode, uint test_if_locked); + int close(void); + + int write_row(byte *buf); + int update_row(const byte *old_data, byte *new_data); + int delete_row(const byte *buf); + int index_init(uint index); + int index_end(); + int index_read(byte *buf, const byte *key, uint key_len, + enum ha_rkey_function find_flag); + int index_read_idx(byte *buf, uint index, const byte *key, uint key_len, + enum ha_rkey_function find_flag); + int index_next(byte *buf); + int index_prev(byte *buf); + int index_first(byte *buf); + int index_last(byte *buf); + int rnd_init(bool scan=1); + int rnd_end(); + int rnd_next(byte *buf); + int rnd_pos(byte *buf, byte *pos); + void position(const byte *record); + + void info(uint); + int extra(enum ha_extra_function operation); + int extra_opt(enum ha_extra_function operation, ulong cache_size); + int reset(); + int external_lock(THD *thd, int lock_type); + int start_stmt(THD *thd); + const char * table_type() const { return("ndbcluster");} + const char ** bas_ext() const; + ulong table_flags(void) const { return m_table_flags; } + ulong index_flags(uint idx) const; + uint max_record_length() const { return NDB_MAX_TUPLE_SIZE; }; + uint max_keys() const { return MAX_KEY; } + uint max_key_parts() const { return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY; }; + uint max_key_length() const { return NDB_MAX_KEY_SIZE;}; + + int rename_table(const char *from, const char *to); + int delete_table(const char *name); + int create(const char *name, TABLE *form, HA_CREATE_INFO *info); + THR_LOCK_DATA **store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type); + + bool low_byte_first() const + { +#ifdef WORDS_BIGENDIAN + return false; +#else + return true; +#endif + } + bool has_transactions() { return true;} + + const char* index_type(uint key_number) { + switch (get_index_type(key_number)) { + case ORDERED_INDEX: + return "BTREE"; + case UNIQUE_INDEX: + case PRIMARY_KEY_INDEX: + default: + return "HASH"; + } + } + + double scan_time(); + ha_rows records_in_range(int inx, + const byte *start_key,uint start_key_len, + enum ha_rkey_function start_search_flag, + const byte *end_key,uint end_key_len, + enum ha_rkey_function end_search_flag); + + + static Ndb* seize_ndb(); + static void release_ndb(Ndb* ndb); + + + private: + int alter_table_name(const char *from, const char *to); + int drop_table(); + int create_index(const char *name, KEY *key_info); + int initialize_autoincrement(const void* table); + int get_metadata(const char* path); + void release_metadata(); + const char* get_index_name(uint idx_no) const; + NDB_INDEX_TYPE get_index_type(uint idx_no) const; + NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; + + int pk_read(const byte *key, uint key_len, + byte *buf); + int unique_index_read(const byte *key, uint key_len, + byte *buf); + int ordered_index_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag); + int full_table_scan(byte * buf); + int next_result(byte *buf); +#if 0 + int filtered_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag); +#endif + + void unpack_record(byte *buf); + + void set_dbname(const char *pathname); + void set_tabname(const char *pathname); + void set_tabname(const char *pathname, char *tabname); + + bool set_hidden_key(NdbOperation*, + uint fieldnr, const byte* field_ptr); + int set_ndb_key(NdbOperation*, Field *field, + uint fieldnr, const byte* field_ptr); + int set_ndb_value(NdbOperation*, Field *field, uint fieldnr); + int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr); + int set_primary_key(NdbOperation *op, const byte *key); + int set_primary_key(NdbOperation *op); + int key_cmp(uint keynr, const byte * old_row, const byte * new_row); + void print_results(); + + longlong get_auto_increment(); + + int ndb_err(NdbConnection*); + + private: + int check_ndb_connection(); + + NdbConnection *m_active_trans; + NdbResultSet *m_active_cursor; + Ndb *m_ndb; + void *m_table; + char m_dbname[FN_HEADLEN]; + //char m_schemaname[FN_HEADLEN]; + char m_tabname[FN_HEADLEN]; + ulong m_table_flags; + THR_LOCK_DATA m_lock; + NDB_SHARE *m_share; + NDB_INDEX_TYPE m_indextype[MAX_KEY]; + NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; + bool m_use_write; +}; + +bool ndbcluster_init(void); +bool ndbcluster_end(void); + +int ndbcluster_commit(THD *thd, void* ndb_transaction); +int ndbcluster_rollback(THD *thd, void* ndb_transaction); + +void ndbcluster_close_connection(THD *thd); + +int ndbcluster_discover(const char* dbname, const char* name, + const void** frmblob, uint* frmlen); +int ndbcluster_drop_database(const char* path); + + + + + + + + diff --git a/sql/handler.cc b/sql/handler.cc index 2b22563bc3c..e1ee3315f40 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -37,6 +37,9 @@ #else #define innobase_query_caching_of_table_permitted(X,Y,Z) 1 #endif +#ifdef HAVE_NDBCLUSTER_DB +#include "ha_ndbcluster.h" +#endif #include <myisampack.h> #include <errno.h> @@ -48,7 +51,7 @@ ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count, ha_read_key_count, ha_read_next_count, ha_read_prev_count, ha_read_first_count, ha_read_last_count, ha_commit_count, ha_rollback_count, - ha_read_rnd_count, ha_read_rnd_next_count; + ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count; static SHOW_COMP_OPTION have_yes= SHOW_OPTION_YES; @@ -76,6 +79,10 @@ struct show_table_type_st sys_table_types[]= "Supports transactions and page-level locking", DB_TYPE_BERKELEY_DB}, {"BERKELEYDB",&have_berkeley_db, "Alias for BDB", DB_TYPE_BERKELEY_DB}, + {"NDBCLUSTER", &have_ndbcluster, + "Clustered, fault tolerant memory based tables", DB_TYPE_NDBCLUSTER}, + {"NDB", &have_ndbcluster, + "Alias for NDBCLUSTER", DB_TYPE_NDBCLUSTER}, {NullS, NULL, NullS, DB_TYPE_UNKNOWN} }; @@ -172,6 +179,10 @@ handler *get_new_handler(TABLE *table, enum db_type db_type) case DB_TYPE_INNODB: return new ha_innobase(table); #endif +#ifdef HAVE_NDBCLUSTER_DB + case DB_TYPE_NDBCLUSTER: + return new ha_ndbcluster(table); +#endif case DB_TYPE_HEAP: return new ha_heap(table); default: // should never happen @@ -216,6 +227,18 @@ int ha_init() opt_using_transactions=1; } #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + { + if (ndbcluster_init()) + { + have_ndbcluster= SHOW_OPTION_DISABLED; + error= 1; + } + else + opt_using_transactions=1; + } +#endif return error; } @@ -243,6 +266,10 @@ int ha_panic(enum ha_panic_function flag) if (have_innodb == SHOW_OPTION_YES) error|=innobase_end(); #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + error|=ndbcluster_end(); +#endif return error; } /* ha_panic */ @@ -252,6 +279,10 @@ void ha_drop_database(char* path) if (have_innodb == SHOW_OPTION_YES) innobase_drop_database(path); #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + ndbcluster_drop_database(path); +#endif } void ha_close_connection(THD* thd) @@ -260,6 +291,10 @@ void ha_close_connection(THD* thd) if (have_innodb == SHOW_OPTION_YES) innobase_close_connection(thd); #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + ndbcluster_close_connection(thd); +#endif } /* @@ -419,6 +454,19 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans) WRITE_CACHE, (my_off_t) 0, 0, 1); thd->transaction.trans_log.end_of_file= max_binlog_cache_size; } +#ifdef HAVE_NDBCLUSTER_DB + if (trans->ndb_tid) + { + if ((error=ndbcluster_commit(thd,trans->ndb_tid))) + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), error); + error=1; + } + if (trans == &thd->transaction.all) + operation_done= transaction_commited= 1; + trans->ndb_tid=0; + } +#endif #ifdef HAVE_BERKELEY_DB if (trans->bdb_tid) { @@ -472,6 +520,18 @@ int ha_rollback_trans(THD *thd, THD_TRANS *trans) if (opt_using_transactions) { bool operation_done=0; +#ifdef HAVE_NDBCLUSTER_DB + if (trans->ndb_tid) + { + if ((error=ndbcluster_rollback(thd, trans->ndb_tid))) + { + my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), error); + error=1; + } + trans->ndb_tid = 0; + operation_done=1; + } +#endif #ifdef HAVE_BERKELEY_DB if (trans->bdb_tid) { @@ -1151,8 +1211,10 @@ bool handler::caching_allowed(THD* thd, char* table_key, ** Some general functions that isn't in the handler class ****************************************************************************/ - /* Initiates table-file and calls apropriate database-creator */ - /* Returns 1 if something got wrong */ +/* + Initiates table-file and calls apropriate database-creator + Returns 1 if something got wrong +*/ int ha_create_table(const char *name, HA_CREATE_INFO *create_info, bool update_create_info) @@ -1168,7 +1230,7 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info, { update_create_info_from_table(create_info, &table); if (table.file->table_flags() & HA_DROP_BEFORE_CREATE) - table.file->delete_table(name); // Needed for BDB tables + table.file->delete_table(name); } if (lower_case_table_names == 2 && !(table.file->table_flags() & HA_FILE_BASED)) @@ -1290,6 +1352,26 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, /* + Try to discover one table from handler(s) +*/ + +int ha_discover(const char* dbname, const char* name, + const void** frmblob, uint* frmlen) +{ + int error= 1; // Table does not exist in any handler + DBUG_ENTER("ha_discover"); + DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name)); +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + error= ndbcluster_discover(dbname, name, frmblob, frmlen); +#endif + if (!error) + statistic_increment(ha_discover_count,&LOCK_status); + DBUG_RETURN(error); +} + + +/* Read first row between two ranges. Store ranges for future calls to read_range_next @@ -1425,3 +1507,5 @@ int handler::compare_key(key_range *range) } return key_compare_result_on_equal; } + + diff --git a/sql/handler.h b/sql/handler.h index 4cb6ab86a37..a5d4f78ef29 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -28,7 +28,8 @@ #define NO_HASH /* Not yet implemented */ #endif -#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB) +#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB) || \ + defined(HAVE_NDBCLUSTER_DB) #define USING_TRANSACTIONS #endif @@ -80,7 +81,6 @@ #define HA_FILE_BASED (1 << 26) - /* bits in index_flags(index_number) for what you can do with index */ #define HA_WRONG_ASCII_ORDER 1 /* Can't use sorting through key */ #define HA_READ_NEXT 2 /* Read next record with same key */ @@ -141,12 +141,17 @@ #define HA_CACHE_TBL_ASKTRANSACT 1 #define HA_CACHE_TBL_TRANSACT 2 -enum db_type { DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1, - DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM, - DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM, - DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM, - DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB, DB_TYPE_GEMINI, - DB_TYPE_DEFAULT }; +enum db_type +{ + DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1, + DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM, + DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM, + DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM, + DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB, + DB_TYPE_GEMINI, DB_TYPE_NDBCLUSTER, + + DB_TYPE_DEFAULT // Must be last +}; struct show_table_type_st { const char *type; @@ -176,6 +181,7 @@ typedef struct st_thd_trans { void *bdb_tid; void *innobase_tid; bool innodb_active_trans; + void *ndb_tid; } THD_TRANS; enum enum_tx_isolation { ISO_READ_UNCOMMITTED, ISO_READ_COMMITTED, @@ -479,3 +485,5 @@ bool ha_flush_logs(void); int ha_recovery_logging(THD *thd, bool on); int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache); +int ha_discover(const char* dbname, const char* name, + const void** frmblob, uint* frmlen); diff --git a/sql/lex.h b/sql/lex.h index 94ea0295f05..e5bc537c213 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -48,7 +48,7 @@ SYM_GROUP sym_group_rtree= {"RTree keys", "HAVE_RTREE_KEYS"}; */ static SYMBOL symbols[] = { - { "&&", SYM(AND)}, + { "&&", SYM(AND_SYM)}, { "<", SYM(LT)}, { "<=", SYM(LE)}, { "<>", SYM(NE)}, @@ -67,7 +67,7 @@ static SYMBOL symbols[] = { { "ALL", SYM(ALL)}, { "ALTER", SYM(ALTER)}, { "ANALYZE", SYM(ANALYZE_SYM)}, - { "AND", SYM(AND)}, + { "AND", SYM(AND_SYM)}, { "ANY", SYM(ANY_SYM)}, { "AS", SYM(AS)}, { "ASC", SYM(ASC)}, @@ -295,6 +295,8 @@ static SYMBOL symbols[] = { { "NAMES", SYM(NAMES_SYM)}, { "NATIONAL", SYM(NATIONAL_SYM)}, { "NATURAL", SYM(NATURAL)}, + { "NDB", SYM(NDBCLUSTER_SYM)}, + { "NDBCLUSTER", SYM(NDBCLUSTER_SYM)}, { "NCHAR", SYM(NCHAR_SYM)}, { "NEW", SYM(NEW_SYM)}, { "NEXT", SYM(NEXT_SYM)}, @@ -312,7 +314,7 @@ static SYMBOL symbols[] = { { "OPTIMIZE", SYM(OPTIMIZE)}, { "OPTION", SYM(OPTION)}, { "OPTIONALLY", SYM(OPTIONALLY)}, - { "OR", SYM(OR)}, + { "OR", SYM(OR_SYM)}, { "ORDER", SYM(ORDER_SYM)}, { "OUTER", SYM(OUTER)}, { "OUTFILE", SYM(OUTFILE)}, diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 1175b93b9ba..9e7db79a7b2 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -837,7 +837,7 @@ extern ulong server_id, concurrency; extern ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count; extern ulong ha_read_key_count, ha_read_next_count, ha_read_prev_count; extern ulong ha_read_first_count, ha_read_last_count; -extern ulong ha_read_rnd_count, ha_read_rnd_next_count; +extern ulong ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count; extern ulong ha_commit_count, ha_rollback_count,table_cache_size; extern ulong max_connections,max_connect_errors, connect_timeout; extern ulong slave_net_timeout; @@ -891,6 +891,7 @@ extern SHOW_VAR init_vars[],status_vars[], internal_vars[]; extern SHOW_COMP_OPTION have_isam; extern SHOW_COMP_OPTION have_innodb; extern SHOW_COMP_OPTION have_berkeley_db; +extern SHOW_COMP_OPTION have_ndbcluster; extern struct system_variables global_system_variables; extern struct system_variables max_system_variables; extern struct rand_struct sql_rand; @@ -960,6 +961,10 @@ int format_number(uint inputflag,uint max_length,my_string pos,uint length, my_string *errpos); int openfrm(const char *name,const char *alias,uint filestat,uint prgflag, uint ha_open_flags, TABLE *outparam); +int readfrm(const char *name, const void** data, uint* length); +int writefrm(const char* name, const void* data, uint len); +int create_table_from_handler(const char *db, const char *name, + bool create_if_found); int closefrm(TABLE *table); db_type get_table_type(const char *name); int read_string(File file, gptr *to, uint length); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 5176ee33a17..2e0e8493aad 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -32,6 +32,9 @@ #ifdef HAVE_ISAM #include "ha_isam.h" #endif +#ifdef HAVE_NDBCLUSTER_DB +#include "ha_ndbcluster.h" +#endif #include <nisam.h> #include <thr_alarm.h> #include <ft_global.h> @@ -261,7 +264,7 @@ my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol; my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_log_slave_updates= 0; -my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam; +my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster; my_bool opt_readonly, use_temp_pool, relay_log_purge; my_bool opt_sync_bdb_logs, opt_sync_frm; my_bool opt_secure_auth= 0; @@ -370,7 +373,7 @@ KEY_CACHE *sql_key_cache; CHARSET_INFO *system_charset_info, *files_charset_info ; CHARSET_INFO *national_charset_info, *table_alias_charset; -SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam; +SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam, have_ndbcluster; SHOW_COMP_OPTION have_raid, have_openssl, have_symlink, have_query_cache; SHOW_COMP_OPTION have_crypt, have_compress; @@ -3625,7 +3628,7 @@ enum options_mysqld OPT_INNODB_FAST_SHUTDOWN, OPT_INNODB_FILE_PER_TABLE, OPT_SAFE_SHOW_DB, - OPT_INNODB, OPT_ISAM, OPT_SKIP_SAFEMALLOC, + OPT_INNODB, OPT_ISAM, OPT_NDBCLUSTER, OPT_SKIP_SAFEMALLOC, OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, @@ -4158,6 +4161,13 @@ Disable with --skip-innodb (will save memory).", Disable with --skip-isam.", (gptr*) &opt_isam, (gptr*) &opt_isam, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0}, +#ifdef HAVE_NDBCLUSTER_DB + {"ndbcluster", OPT_NDBCLUSTER, "Enable NDB Cluster (if this version of MySQL +supports it). \ +Disable with --skip-ndbcluster (will save memory).", + (gptr*) &opt_ndbcluster, (gptr*) &opt_ndbcluster, 0, GET_BOOL, NO_ARG, 1, 0, 0, + 0, 0, 0}, +#endif {"skip-locking", OPT_SKIP_LOCK, "Deprecated option, use --skip-external-locking instead.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -4828,6 +4838,7 @@ struct show_var_st status_vars[]= { {"Handler_rollback", (char*) &ha_rollback_count, SHOW_LONG}, {"Handler_update", (char*) &ha_update_count, SHOW_LONG}, {"Handler_write", (char*) &ha_write_count, SHOW_LONG}, + {"Handler_discover", (char*) &ha_discover_count, SHOW_LONG}, {"Key_blocks_not_flushed", (char*) &dflt_key_cache_var.global_blocks_changed, SHOW_KEY_CACHE_LONG}, {"Key_blocks_used", (char*) &dflt_key_cache_var.global_blocks_used, @@ -5121,6 +5132,11 @@ static void mysql_init_variables(void) #else have_isam=SHOW_OPTION_NO; #endif +#ifdef HAVE_NDBCLUSTER_DB + have_ndbcluster=SHOW_OPTION_YES; +#else + have_ndbcluster=SHOW_OPTION_NO; +#endif #ifdef USE_RAID have_raid=SHOW_OPTION_YES; #else @@ -5591,6 +5607,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), have_isam= SHOW_OPTION_DISABLED; #endif break; + case OPT_NDBCLUSTER: +#ifdef HAVE_NDBCLUSTER_DB + if (opt_ndbcluster) + have_ndbcluster=SHOW_OPTION_YES; + else + have_ndbcluster=SHOW_OPTION_DISABLED; +#endif + break; case OPT_INNODB: #ifdef HAVE_INNOBASE_DB if (opt_innodb) diff --git a/sql/set_var.cc b/sql/set_var.cc index 5ccda5c7052..0cf4d2d2691 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -59,6 +59,9 @@ #ifdef HAVE_INNOBASE_DB #include "ha_innodb.h" #endif +#ifdef HAVE_NDBCLUSTER_DB +#include "ha_ndbcluster.h" +#endif static HASH system_variable_hash; const char *bool_type_names[]= { "OFF", "ON", NullS }; @@ -638,6 +641,7 @@ struct show_var_st init_vars[]= { {"have_crypt", (char*) &have_crypt, SHOW_HAVE}, {"have_innodb", (char*) &have_innodb, SHOW_HAVE}, {"have_isam", (char*) &have_isam, SHOW_HAVE}, + {"have_ndbcluster", (char*) &have_ndbcluster, SHOW_HAVE}, {"have_openssl", (char*) &have_openssl, SHOW_HAVE}, {"have_query_cache", (char*) &have_query_cache, SHOW_HAVE}, {"have_raid", (char*) &have_raid, SHOW_HAVE}, diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 8c4571dd540..d24b13ff96f 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -1317,18 +1317,34 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db, { char path[FN_REFLEN]; int error; + uint discover_retry_count= 0; DBUG_ENTER("open_unireg_entry"); strxmov(path, mysql_data_home, "/", db, "/", name, NullS); - if (openfrm(path,alias, + while (openfrm(path,alias, (uint) (HA_OPEN_KEYFILE | HA_OPEN_RNDFILE | HA_GET_INDEX | HA_TRY_READ_ONLY), READ_KEYINFO | COMPUTE_TYPES | EXTRA_RECORD, thd->open_options, entry)) { if (!entry->crashed) - goto err; // Can't repair the table + { + /* + Frm file could not be found on disk + Since it does not exist, no one can be using it + LOCK_open has been locked to protect from someone else + trying to discover the table at the same time. + */ + if (discover_retry_count++ != 0) + goto err; + if (create_table_from_handler(db, name, true) != 0) + goto err; + + thd->clear_error(); // Clear error message + continue; + } + // Code below is for repairing a crashed file TABLE_LIST table_list; bzero((char*) &table_list, sizeof(table_list)); // just for safe table_list.db=(char*) db; @@ -1374,6 +1390,7 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db, if (error) goto err; + break; } /* If we are here, there was no fatal error (but error may be still diff --git a/sql/sql_class.h b/sql/sql_class.h index aa526d5e474..d5eb7a9fd0e 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -688,7 +688,10 @@ public: THD_TRANS all; // Trans since BEGIN WORK THD_TRANS stmt; // Trans for current statement uint bdb_lock_count; - + uint ndb_lock_count; +#ifdef HAVE_NDBCLUSTER_DB + void* ndb; +#endif /* Tables changed in transaction (that must be invalidated in query cache). List contain only transactional tables, that not invalidated in query @@ -878,7 +881,8 @@ public: { #ifdef USING_TRANSACTIONS return (transaction.all.bdb_tid != 0 || - transaction.all.innodb_active_trans != 0); + transaction.all.innodb_active_trans != 0 || + transaction.all.ndb_tid != 0); #else return 0; #endif diff --git a/sql/sql_table.cc b/sql/sql_table.cc index d5f77bf545d..c46a9823a52 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1148,6 +1148,35 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name, } } + /* + Check that table with given name does not already + exist in any storage engine. In such a case it should + be discovered and the error ER_TABLE_EXISTS_ERROR be returned + unless user specified CREATE TABLE IF EXISTS + The LOCK_open mutex has been locked to make sure no + one else is attempting to discover the table. Since + it's not on disk as a frm file, no one could be using it! + */ + if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE)) + { + bool create_if_not_exists = + create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS; + if (!create_table_from_handler(db, table_name, + create_if_not_exists)) + { + DBUG_PRINT("info", ("Table already existed in handler")); + + if (create_if_not_exists) + { + create_info->table_existed= 1; // Mark that table existed + error= 0; + } + else + my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name); + goto end; + } + } + thd->proc_info="creating table"; create_info->table_existed= 0; // Mark that table is created diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index b87b0b29677..568a526fd58 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -181,7 +181,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token ACTION %token AGGREGATE_SYM %token ALL -%token AND +%token AND_SYM %token AS %token ASC %token AUTO_INC @@ -305,6 +305,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token NAMES_SYM %token NATIONAL_SYM %token NATURAL +%token NDBCLUSTER_SYM %token NEW_SYM %token NCHAR_SYM %token NCHAR_STRING @@ -318,7 +319,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token OPEN_SYM %token OPTION %token OPTIONALLY -%token OR +%token OR_SYM %token OR_OR_CONCAT %token ORDER_SYM %token OUTER @@ -574,8 +575,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token BEFORE_SYM %left SET_VAR -%left OR_OR_CONCAT OR XOR -%left AND +%left OR_OR_CONCAT OR_SYM XOR +%left AND_SYM %left BETWEEN_SYM CASE_SYM WHEN_SYM THEN_SYM ELSE %left EQ EQUAL_SYM GE GT_SYM LE LT NE IS LIKE REGEXP IN_SYM %left '|' @@ -727,7 +728,7 @@ END_OF_INPUT %type <NONE> '-' '+' '*' '/' '%' '(' ')' - ',' '!' '{' '}' '&' '|' AND OR OR_OR_CONCAT BETWEEN_SYM CASE_SYM + ',' '!' '{' '}' '&' '|' AND_SYM OR_SYM OR_OR_CONCAT BETWEEN_SYM CASE_SYM THEN_SYM WHEN_SYM DIV_SYM MOD_SYM %% @@ -2421,14 +2422,14 @@ expr_expr: { $$= new Item_func_not(new Item_in_subselect($1, $4)); } - | expr BETWEEN_SYM no_and_expr AND expr + | expr BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_between($1,$3,$5); } - | expr NOT BETWEEN_SYM no_and_expr AND expr + | expr NOT BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_not(new Item_func_between($1,$4,$6)); } | expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); } - | expr OR expr { $$= new Item_cond_or($1,$3); } + | expr OR_SYM expr { $$= new Item_cond_or($1,$3); } | expr XOR expr { $$= new Item_cond_xor($1,$3); } - | expr AND expr { $$= new Item_cond_and($1,$3); } + | expr AND_SYM expr { $$= new Item_cond_and($1,$3); } | expr SOUNDS_SYM LIKE expr { $$= new Item_func_eq(new Item_func_soundex($1), @@ -2469,14 +2470,14 @@ expr_expr: /* expressions that begin with 'expr' that do NOT follow IN_SYM */ no_in_expr: - no_in_expr BETWEEN_SYM no_and_expr AND expr + no_in_expr BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_between($1,$3,$5); } - | no_in_expr NOT BETWEEN_SYM no_and_expr AND expr + | no_in_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_not(new Item_func_between($1,$4,$6)); } | no_in_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); } - | no_in_expr OR expr { $$= new Item_cond_or($1,$3); } + | no_in_expr OR_SYM expr { $$= new Item_cond_or($1,$3); } | no_in_expr XOR expr { $$= new Item_cond_xor($1,$3); } - | no_in_expr AND expr { $$= new Item_cond_and($1,$3); } + | no_in_expr AND_SYM expr { $$= new Item_cond_and($1,$3); } | no_in_expr SOUNDS_SYM LIKE expr { $$= new Item_func_eq(new Item_func_soundex($1), @@ -2527,12 +2528,12 @@ no_and_expr: { $$= new Item_func_not(new Item_in_subselect($1, $4)); } - | no_and_expr BETWEEN_SYM no_and_expr AND expr + | no_and_expr BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_between($1,$3,$5); } - | no_and_expr NOT BETWEEN_SYM no_and_expr AND expr + | no_and_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_not(new Item_func_between($1,$4,$6)); } | no_and_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); } - | no_and_expr OR expr { $$= new Item_cond_or($1,$3); } + | no_and_expr OR_SYM expr { $$= new Item_cond_or($1,$3); } | no_and_expr XOR expr { $$= new Item_cond_xor($1,$3); } | no_and_expr SOUNDS_SYM LIKE expr { @@ -4147,8 +4148,8 @@ show_param: YYABORT; } | NEW_SYM MASTER_SYM FOR_SYM SLAVE WITH MASTER_LOG_FILE_SYM EQ - TEXT_STRING_sys AND MASTER_LOG_POS_SYM EQ ulonglong_num - AND MASTER_SERVER_ID_SYM EQ + TEXT_STRING_sys AND_SYM MASTER_LOG_POS_SYM EQ ulonglong_num + AND_SYM MASTER_SERVER_ID_SYM EQ ULONG_NUM { Lex->sql_command = SQLCOM_SHOW_NEW_MASTER; @@ -4983,6 +4984,7 @@ keyword: | NAMES_SYM {} | NATIONAL_SYM {} | NCHAR_SYM {} + | NDBCLUSTER_SYM {} | NEXT_SYM {} | NEW_SYM {} | NO_SYM {} @@ -5434,7 +5436,7 @@ grant_privilege: opt_and: /* empty */ {} - | AND {} + | AND_SYM {} ; require_list: diff --git a/sql/table.cc b/sql/table.cc index 1b7d30560ef..281a8c10409 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -944,7 +944,8 @@ static void frm_error(int error, TABLE *form, const char *name, myf errortype) break; case 2: { - datext=form->file ? *form->file->bas_ext() : ""; + datext= form->file ? *form->file->bas_ext() : ""; + datext= datext==NullS ? "" : datext; err_no= (my_errno == ENOENT) ? ER_FILE_NOT_FOUND : (my_errno == EAGAIN) ? ER_FILE_USED : ER_CANT_OPEN_FILE; my_error(err_no,errortype, |