summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/my_base.h3
-rw-r--r--sql/Makefile.am10
-rw-r--r--sql/discover.cc172
-rwxr-xr-xsql/ha_ndbcluster.cc2943
-rwxr-xr-xsql/ha_ndbcluster.h218
-rw-r--r--sql/handler.cc92
-rw-r--r--sql/handler.h24
-rw-r--r--sql/lex.h8
-rw-r--r--sql/mysql_priv.h7
-rw-r--r--sql/mysqld.cc30
-rw-r--r--sql/set_var.cc4
-rw-r--r--sql/sql_base.cc21
-rw-r--r--sql/sql_class.h8
-rw-r--r--sql/sql_table.cc29
-rw-r--r--sql/sql_yacc.yy40
-rw-r--r--sql/table.cc3
16 files changed, 3564 insertions, 48 deletions
diff --git a/include/my_base.h b/include/my_base.h
index b38e177fd89..28d78d08401 100644
--- a/include/my_base.h
+++ b/include/my_base.h
@@ -287,6 +287,9 @@ enum ha_base_keytype {
#define HA_ERR_ROW_IS_REFERENCED 152 /* Cannot delete a parent row */
#define HA_ERR_NO_SAVEPOINT 153 /* No savepoint with that name */
#define HA_ERR_NON_UNIQUE_BLOCK_SIZE 154 /* Non unique key block size */
+#define HA_ERR_OLD_METADATA 155 /* The frm file on disk is old */
+#define HA_ERR_TABLE_EXIST 156 /* The table existed in storage engine */
+#define HA_ERR_NO_CONNECTION 157 /* Could not connect to storage engine */
/* Other constants */
diff --git a/sql/Makefile.am b/sql/Makefile.am
index 4aa6aaaa1ee..6845e0b5de8 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -16,12 +16,11 @@
#called from the top level Makefile
-
MYSQLDATAdir = $(localstatedir)
MYSQLSHAREdir = $(pkgdatadir)
MYSQLBASEdir= $(prefix)
INCLUDES = @MT_INCLUDES@ \
- @bdb_includes@ @innodb_includes@ \
+ @bdb_includes@ @innodb_includes@ @ndbcluster_includes@ \
-I$(top_srcdir)/include -I$(top_srcdir)/regex \
-I$(srcdir) $(openssl_includes)
WRAPLIBS= @WRAPLIBS@
@@ -42,6 +41,7 @@ LDADD = @isam_libs@ \
mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \
@bdb_libs@ @innodb_libs@ @pstack_libs@ \
@innodb_system_libs@ \
+ @ndbcluster_libs@ @ndbcluster_system_libs@ \
$(LDADD) $(CXXLDFLAGS) $(WRAPLIBS) @LIBDL@ @openssl_libs@
noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
item_strfunc.h item_timefunc.h item_uniq.h \
@@ -52,7 +52,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
field.h handler.h \
ha_isammrg.h ha_isam.h ha_myisammrg.h\
ha_heap.h ha_myisam.h ha_berkeley.h ha_innodb.h \
- opt_range.h protocol.h \
+ ha_ndbcluster.h opt_range.h protocol.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h\
lex.h lex_symbol.h sql_acl.h sql_crypt.h \
log_event.h sql_repl.h slave.h \
@@ -75,11 +75,11 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc \
procedure.cc item_uniq.cc sql_test.cc \
log.cc log_event.cc init.cc derror.cc sql_acl.cc \
unireg.cc des_key_file.cc \
- time.cc opt_range.cc opt_sum.cc \
+ discover.cc time.cc opt_range.cc opt_sum.cc \
records.cc filesort.cc handler.cc \
ha_heap.cc ha_myisam.cc ha_myisammrg.cc \
ha_berkeley.cc ha_innodb.cc \
- ha_isam.cc ha_isammrg.cc \
+ ha_isam.cc ha_isammrg.cc ha_ndbcluster.cc \
sql_db.cc sql_table.cc sql_rename.cc sql_crypt.cc \
sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
diff --git a/sql/discover.cc b/sql/discover.cc
new file mode 100644
index 00000000000..e260f44a8db
--- /dev/null
+++ b/sql/discover.cc
@@ -0,0 +1,172 @@
+/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+/* Functions for discover of frm file from handler */
+
+#include "mysql_priv.h"
+#include <my_dir.h>
+
+/*
+ Read the contents of a .frm file
+
+ SYNOPSIS
+ readfrm()
+
+ name path to table-file "db/name"
+ frmdata frm data
+ len length of the read frmdata
+
+ RETURN VALUES
+ 0 ok
+ 1 Could not open file
+ 2 Could not stat file
+ 3 Could not allocate data for read
+ Could not read file
+
+ frmdata and len are set to 0 on error
+*/
+
+int readfrm(const char *name,
+ const void **frmdata, uint *len)
+{
+ int error;
+ char index_file[FN_REFLEN];
+ File file;
+ ulong read_len;
+ char *read_data;
+ MY_STAT state;
+ DBUG_ENTER("readfrm");
+ DBUG_PRINT("enter",("name: '%s'",name));
+
+ *frmdata= NULL; // In case of errors
+ *len= 0;
+ error= 1;
+ if ((file=my_open(fn_format(index_file,name,"",reg_ext,4),
+ O_RDONLY | O_SHARE,
+ MYF(0))) < 0)
+ goto err_end;
+
+ // Get length of file
+ error= 2;
+ if (my_fstat(file, &state, MYF(0)))
+ goto err;
+ read_len= state.st_size;
+
+ // Read whole frm file
+ error= 3;
+ read_data= 0;
+ if (read_string(file, &read_data, read_len))
+ goto err;
+
+ // Setup return data
+ *frmdata= (void*) read_data;
+ *len= read_len;
+ error= 0;
+
+ err:
+ if (file > 0)
+ VOID(my_close(file,MYF(MY_WME)));
+
+ err_end: /* Here when no file */
+ DBUG_RETURN (error);
+} /* readfrm */
+
+
+/*
+ Write the content of a frm data pointer
+ to a frm file
+
+ SYNOPSIS
+ writefrm()
+
+ name path to table-file "db/name"
+ frmdata frm data
+ len length of the frmdata
+
+ RETURN VALUES
+ 0 ok
+ 2 Could not write file
+*/
+
+int writefrm(const char *name, const void *frmdata, uint len)
+{
+ File file;
+ char index_file[FN_REFLEN];
+ int error;
+ DBUG_ENTER("writefrm");
+ DBUG_PRINT("enter",("name: '%s' len: %d ",name,len));
+ //DBUG_DUMP("frmdata", (char*)frmdata, len);
+
+ error= 0;
+ if ((file=my_create(fn_format(index_file,name,"",reg_ext,4),
+ CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
+ {
+ if (my_write(file,(byte*)frmdata,len,MYF(MY_WME | MY_NABP)))
+ error= 2;
+ }
+ VOID(my_close(file,MYF(0)));
+ DBUG_RETURN(error);
+} /* writefrm */
+
+
+
+
+/*
+ Try to discover table from handler and
+ if found, write the frm file to disk.
+
+ RETURN VALUES:
+ 0 : Table existed in handler and created
+ on disk if so requested
+ 1 : Table does not exist
+ >1 : error
+
+*/
+
+int create_table_from_handler(const char *db,
+ const char *name,
+ bool create_if_found)
+{
+ int error= 0;
+ const void* frmblob = NULL;
+ char path[FN_REFLEN];
+ uint frmlen = 0;
+ DBUG_ENTER("create_table_from_handler");
+ DBUG_PRINT("enter", ("create_if_found: %d", create_if_found));
+
+ if (ha_discover(db, name, &frmblob, &frmlen))
+ DBUG_RETURN(1); // Table does not exist
+
+ // Table exists in handler
+ if (create_if_found)
+ {
+ (void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS);
+ // Save the frm file
+ error = writefrm(path, frmblob, frmlen);
+ }
+
+ err:
+ if (frmblob)
+ my_free((char*) frmblob,MYF(0));
+ DBUG_RETURN(error);
+}
+
+int table_exists_in_handler(const char *db,
+ const char *name)
+{
+ return (create_table_from_handler(db, name, false) == 0);
+}
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
new file mode 100755
index 00000000000..3bc322878d1
--- /dev/null
+++ b/sql/ha_ndbcluster.cc
@@ -0,0 +1,2943 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+/*
+ This file defines the NDB Cluster handler: the interface between MySQL and
+ NDB Cluster
+*/
+
+/*
+ TODO
+ After CREATE DATABASE gör discover på alla tabeller i den databasen
+
+*/
+
+
+#ifdef __GNUC__
+#pragma implementation // gcc: Class implementation
+#endif
+
+#include "mysql_priv.h"
+
+#ifdef HAVE_NDBCLUSTER_DB
+#include <my_dir.h>
+#include "ha_ndbcluster.h"
+#include <ndbapi/NdbApi.hpp>
+#include <ndbapi/NdbScanFilter.hpp>
+
+#define USE_DISCOVER_ON_STARTUP
+//#define USE_NDB_POOL
+
+// Default value for parallelism
+static const int parallelism= 240;
+
+#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
+
+#define ERR_PRINT(err) \
+ DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
+
+#define ERR_RETURN(err) \
+{ \
+ ERR_PRINT(err); \
+ DBUG_RETURN(ndb_to_mysql_error(&err)); \
+}
+
+// Typedefs for long names
+typedef NdbDictionary::Column NDBCOL;
+typedef NdbDictionary::Table NDBTAB;
+typedef NdbDictionary::Index NDBINDEX;
+typedef NdbDictionary::Dictionary NDBDICT;
+
+bool ndbcluster_inited= false;
+
+// Handler synchronization
+pthread_mutex_t ndbcluster_mutex;
+
+// Table lock handling
+static HASH ndbcluster_open_tables;
+
+static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
+ my_bool not_used __attribute__((unused)));
+static NDB_SHARE *get_share(const char *table_name);
+static void free_share(NDB_SHARE *share);
+
+static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
+static int unpackfrm(const void **data, uint *len,
+ const void* pack_data);
+
+/*
+ Error handling functions
+*/
+
+struct err_code_mapping
+{
+ int ndb_err;
+ int my_err;
+};
+
+static const err_code_mapping err_map[]=
+{
+ { 626, HA_ERR_KEY_NOT_FOUND },
+ { 630, HA_ERR_FOUND_DUPP_KEY },
+ { 893, HA_ERR_FOUND_DUPP_UNIQUE },
+ { 721, HA_ERR_TABLE_EXIST },
+ { 241, HA_ERR_OLD_METADATA },
+ { -1, -1 }
+};
+
+
+static int ndb_to_mysql_error(const NdbError *err)
+{
+ uint i;
+ for (i=0 ; err_map[i].ndb_err != err->code ; i++)
+ {
+ if (err_map[i].my_err == -1)
+ return err->code;
+ }
+ return err_map[i].my_err;
+}
+
+
+/*
+ Take care of the error that occured in NDB
+
+ RETURN
+ 0 No error
+ # The mapped error code
+*/
+
+int ha_ndbcluster::ndb_err(NdbConnection *trans)
+{
+ const NdbError err= trans->getNdbError();
+ if (!err.code)
+ return 0; // Don't log things to DBUG log if no error
+ DBUG_ENTER("ndb_err");
+
+ ERR_PRINT(err);
+ switch (err.classification) {
+ case NdbError::SchemaError:
+ {
+ NDBDICT *dict= m_ndb->getDictionary();
+ DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
+ dict->invalidateTable(m_tabname);
+ break;
+ }
+ default:
+ break;
+ }
+ DBUG_RETURN(ndb_to_mysql_error(&err));
+}
+
+
+/*
+ Instruct NDB to set the value of the hidden primary key
+*/
+
+bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
+ uint fieldnr, const byte *field_ptr)
+{
+ DBUG_ENTER("set_hidden_key");
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr,
+ NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0);
+}
+
+
+/*
+ Instruct NDB to set the value of one primary key attribute
+*/
+
+int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
+ uint fieldnr, const byte *field_ptr)
+{
+ uint32 pack_len= field->pack_length();
+ DBUG_ENTER("set_ndb_key");
+ DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d",
+ fieldnr, field->field_name, field->type(),
+ pack_len));
+ DBUG_DUMP("key", (char*)field_ptr, pack_len);
+
+ switch (field->type()) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ default:
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+ DBUG_RETURN(3);
+}
+
+
+/*
+ Instruct NDB to set the value of one attribute
+*/
+
+int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
+ uint fieldnr)
+{
+ const byte* field_ptr= field->ptr;
+ uint32 pack_len= field->pack_length();
+ DBUG_ENTER("set_ndb_value");
+ DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
+ fieldnr, field->field_name, field->type(),
+ pack_len, field->is_null()?"Y":"N"));
+ DBUG_DUMP("value", (char*) field_ptr, pack_len);
+
+ if (field->is_null())
+ {
+ // Set value to NULL
+ DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
+ }
+
+ switch (field->type()) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ default:
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+ DBUG_RETURN(3);
+}
+
+
+/*
+ Instruct NDB to fetch one field
+ - data is read directly into buffer provided by field_ptr
+ if it's NULL, data is read into memory provided by NDBAPI
+*/
+
+int ha_ndbcluster::get_ndb_value(NdbOperation *op,
+ uint field_no, byte *field_ptr)
+{
+ DBUG_ENTER("get_ndb_value");
+ DBUG_PRINT("enter", ("field_no: %d", field_no));
+ m_value[field_no]= op->getValue(field_no, field_ptr);
+ DBUG_RETURN(m_value == NULL);
+}
+
+
+/*
+ Get metadata for this table from NDB
+
+ IMPLEMENTATION
+ - save the NdbDictionary::Table for easy access
+ - check that frm-file on disk is equal to frm-file
+ of table accessed in NDB
+ - build a list of the indexes for the table
+*/
+
+int ha_ndbcluster::get_metadata(const char *path)
+{
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *tab;
+ const void *data, *pack_data;
+ const char **key_name;
+ uint ndb_columns, mysql_columns, length, pack_length, i;
+ int error;
+ DBUG_ENTER("get_metadata");
+ DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
+
+ if (!(tab= dict->getTable(m_tabname)))
+ ERR_RETURN(dict->getNdbError());
+ DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
+
+ /*
+ This is the place to check that the table we got from NDB
+ is equal to the one on local disk
+ */
+ ndb_columns= (uint) tab->getNoOfColumns();
+ mysql_columns= table->fields;
+ if (table->primary_key == MAX_KEY)
+ ndb_columns--;
+ if (ndb_columns != mysql_columns)
+ {
+ DBUG_PRINT("error",
+ ("Wrong number of columns, ndb: %d mysql: %d",
+ ndb_columns, mysql_columns));
+ DBUG_RETURN(HA_ERR_OLD_METADATA);
+ }
+
+ /*
+ Compare FrmData in NDB with frm file from disk.
+ */
+ error= 0;
+ if (readfrm(path, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_RETURN(1);
+ }
+
+ if ((pack_length != tab->getFrmLength()) ||
+ (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ {
+ DBUG_PRINT("error",
+ ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+ pack_length, tab->getFrmLength(),
+ memcmp(pack_data, tab->getFrmData(), pack_length)));
+ DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
+ DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
+ error= HA_ERR_OLD_METADATA;
+ }
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+ if (error)
+ DBUG_RETURN(error);
+
+ // All checks OK, lets use the table
+ m_table= (void*)tab;
+
+ for (i= 0; i < MAX_KEY; i++)
+ m_indextype[i]= UNDEFINED_INDEX;
+
+ // Save information about all known indexes
+ for (i= 0; i < table->keys; i++)
+ m_indextype[i] = get_index_type_from_table(i);
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Decode the type of an index from information
+ provided in table object
+*/
+NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const
+{
+ if (index_no == table->primary_key)
+ return PRIMARY_KEY_INDEX;
+ else
+ return ((table->key_info[index_no].flags & HA_NOSAME) ?
+ UNIQUE_INDEX :
+ ORDERED_INDEX);
+}
+
+
+void ha_ndbcluster::release_metadata()
+{
+ DBUG_ENTER("release_metadata");
+ DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
+
+ m_table= NULL;
+
+ DBUG_VOID_RETURN;
+}
+
+static const ulong index_type_flags[]=
+{
+ /* UNDEFINED_INDEX */
+ 0,
+
+ /* PRIMARY_KEY_INDEX */
+ HA_ONLY_WHOLE_INDEX |
+ HA_WRONG_ASCII_ORDER |
+ HA_NOT_READ_PREFIX_LAST,
+
+ /* UNIQUE_INDEX */
+ HA_ONLY_WHOLE_INDEX |
+ HA_WRONG_ASCII_ORDER |
+ HA_NOT_READ_PREFIX_LAST,
+
+ /* ORDERED_INDEX */
+ HA_READ_NEXT |
+ HA_READ_PREV |
+ HA_NOT_READ_AFTER_KEY
+};
+
+static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
+
+inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
+{
+ return table->keynames.type_names[idx_no];
+}
+
+inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
+{
+ DBUG_ASSERT(idx_no < MAX_KEY);
+ return m_indextype[idx_no];
+}
+
+
+/*
+ Get the flags for an index
+
+ RETURN
+ flags depending on the type of the index.
+*/
+
+inline ulong ha_ndbcluster::index_flags(uint idx_no) const
+{
+ DBUG_ENTER("index_flags");
+ DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
+ DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
+}
+
+
+int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
+{
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+ DBUG_ENTER("set_primary_key");
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, key))
+ ERR_RETURN(op->getNdbError());
+ key += key_part->length;
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::set_primary_key(NdbOperation *op)
+{
+ DBUG_ENTER("set_primary_key");
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Read one record from NDB using primary key
+*/
+
+int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
+{
+ uint no_fields= table->fields, i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ THD *thd= current_thd;
+ DBUG_ENTER("pk_read");
+ DBUG_PRINT("enter", ("key_len: %u", key_len));
+ DBUG_DUMP("key", (char*)key, key_len);
+
+ if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
+ goto err;
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ DBUG_DUMP("key", (char*)key, 8);
+ if (set_hidden_key(op, no_fields, key))
+ goto err;
+ // Read key at the same time, for future reference
+ if (get_ndb_value(op, no_fields, NULL))
+ goto err;
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op, key)))
+ return res;
+ }
+
+ // Read non-key field(s)
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if (thd->query_id == field->query_id)
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ goto err;
+ }
+ else
+ {
+ // Attribute was not to be read
+ m_value[i]= NULL;
+ }
+ }
+
+ if (trans->execute(NoCommit, IgnoreError) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
+
+ // The value have now been fetched from NDB
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+
+ err:
+ ERR_RETURN(trans->getNdbError());
+}
+
+
+/*
+ Read one record from NDB using unique secondary index
+*/
+
+int ha_ndbcluster::unique_index_read(const byte *key,
+ uint key_len, byte *buf)
+{
+ NdbConnection *trans= m_active_trans;
+ const char *index_name;
+ NdbIndexOperation *op;
+ THD *thd= current_thd;
+ byte *key_ptr;
+ KEY* key_info;
+ KEY_PART_INFO *key_part, *end;
+ uint i;
+ DBUG_ENTER("unique_index_read");
+ DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
+ DBUG_DUMP("key", (char*)key, key_len);
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) ||
+ op->readTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ // Set secondary index key(s)
+ key_ptr= (byte *) key;
+ key_info= table->key_info + active_index;
+ DBUG_ASSERT(key_info->key_length == key_len);
+ end= (key_part= key_info->key_part) + key_info->key_parts;
+
+ for (i= 0; key_part != end; key_part++, i++)
+ {
+ if (set_ndb_key(op, key_part->field, i, key_ptr))
+ ERR_RETURN(trans->getNdbError());
+ key_ptr+= key_part->length;
+ }
+
+ // Get non-index attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ // Attribute was not to be read
+ m_value[i]= NULL;
+ }
+ }
+
+ if (trans->execute(NoCommit, IgnoreError) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
+ // The value have now been fetched from NDB
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+}
+
+/*
+ Get the next record of a started scan
+*/
+
+inline int ha_ndbcluster::next_result(byte *buf)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("next_result");
+
+ if (cursor->nextResult() == 0)
+ {
+ // One more record found
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+ }
+ table->status= STATUS_NOT_FOUND;
+ if (ndb_err(trans))
+ ERR_RETURN(trans->getNdbError());
+
+ // No more records
+ DBUG_PRINT("info", ("No more records"));
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+}
+
+
+/*
+ Read record(s) from NDB using ordered index scan
+*/
+
+int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag)
+{
+ uint no_fields= table->fields;
+ uint tot_len, i;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+ NdbScanOperation *op;
+ const char *bound_str= NULL;
+ const char *index_name;
+ NdbOperation::BoundType bound_type = NdbOperation::BoundEQ;
+ bool can_be_handled_by_ndb= FALSE;
+ byte *key_ptr;
+ KEY *key_info;
+ THD* thd = current_thd;
+ DBUG_ENTER("ordered_index_scan");
+ DBUG_PRINT("enter", ("index: %u", active_index));
+ DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ switch (find_flag) {
+ case HA_READ_KEY_EXACT: /* Find first record else error */
+ bound_str= "HA_READ_KEY_EXACT";
+ bound_type= NdbOperation::BoundEQ;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_KEY_OR_NEXT: /* Record or next record */
+ bound_str= "HA_READ_KEY_OR_NEXT";
+ bound_type= NdbOperation::BoundLE;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_KEY_OR_PREV: /* Record or previous */
+ bound_str= "HA_READ_KEY_OR_PREV";
+ bound_type= NdbOperation::BoundGE;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
+ bound_str= "HA_READ_AFTER_KEY";
+ bound_type= NdbOperation::BoundLT;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
+ bound_str= "HA_READ_BEFORE_KEY";
+ bound_type= NdbOperation::BoundGT;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_PREFIX: /* Key which as same prefix */
+ bound_str= "HA_READ_PREFIX";
+ break;
+ case HA_READ_PREFIX_LAST: /* Last key with the same prefix */
+ bound_str= "HA_READ_PREFIX_LAST";
+ break;
+ case HA_READ_PREFIX_LAST_OR_PREV:
+ /* Last or prev key with the same prefix */
+ bound_str= "HA_READ_PREFIX_LAST_OR_PREV";
+ break;
+ default:
+ bound_str= "UNKNOWN";
+ break;
+ }
+ DBUG_PRINT("info", ("find_flag: %s, bound_type: %d,"
+ "can_be_handled_by_ndb: %d",
+ bound_str, bound_type, can_be_handled_by_ndb));
+ if (!can_be_handled_by_ndb)
+ DBUG_RETURN(1);
+
+ // Set bounds using key data
+ tot_len= 0;
+ key_ptr= (byte *) key;
+ key_info= table->key_info + active_index;
+ for (i= 0; i < key_info->key_parts; i++)
+ {
+ Field* field= key_info->key_part[i].field;
+ uint32 field_len= field->pack_length();
+ DBUG_PRINT("info", ("Set index bound on %s",
+ field->field_name));
+ DBUG_DUMP("key", (char*)key_ptr, field_len);
+
+ if (op->setBound(field->field_name,
+ bound_type,
+ key_ptr,
+ field_len) != 0)
+ ERR_RETURN(op->getNdbError());
+
+ key_ptr+= field_len;
+ tot_len+= field_len;
+ if (tot_len >= key_len)
+ break;
+ }
+
+ // Define attributes to read
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i]= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= no_fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+
+#if 0
+/*
+ Read record(s) from NDB using full table scan with filter
+ */
+
+int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag)
+{
+ uint no_fields= table->fields;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+
+ DBUG_ENTER("filtered_scan");
+ DBUG_PRINT("enter", ("key_len: %u, index: %u",
+ key_len, active_index));
+ DBUG_DUMP("key", (char*)key, key_len);
+ DBUG_PRINT("info", ("Starting a new filtered scan on %s",
+ m_tabname));
+ NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
+ if (!op)
+ ERR_RETURN(trans->getNdbError());
+
+ cursor= op->readTuples(parallelism);
+ if (!cursor)
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ {
+ // Start scan filter
+ NdbScanFilter sf(op);
+ sf.begin();
+
+ // Set filter using the supplied key data
+ byte *key_ptr= (byte *) key;
+ uint tot_len= 0;
+ KEY* key_info= table->key_info + active_index;
+ for (uint k= 0; k < key_info->key_parts; k++)
+ {
+ KEY_PART_INFO* key_part= key_info->key_part+k;
+ Field* field= key_part->field;
+ uint ndb_fieldnr= key_part->fieldnr-1;
+ DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
+ // const NDBCOL *col= tab->getColumn(ndb_fieldnr);
+ uint32 field_len= field->pack_length();
+ DBUG_DUMP("key", (char*)key, field_len);
+
+ DBUG_PRINT("info", ("Column %s, type: %d, len: %d",
+ field->field_name, field->real_type(), field_len));
+
+ // Define scan filter
+ if (field->real_type() == MYSQL_TYPE_STRING)
+ sf.eq(ndb_fieldnr, key_ptr, field_len);
+ else
+ {
+ if (field_len == 8)
+ sf.eq(ndb_fieldnr, (Uint64)*key_ptr);
+ else if (field_len <= 4)
+ sf.eq(ndb_fieldnr, (Uint32)*key_ptr);
+ else
+ DBUG_RETURN(1);
+ }
+
+ key_ptr += field_len;
+ tot_len += field_len;
+
+ if (tot_len >= key_len)
+ break;
+ }
+ // End scan filter
+ sf.end();
+ }
+
+ // Define attributes to read
+ for (uint field_no= 0; field_no < no_fields; field_no++)
+ {
+ Field *field= table->field[field_no];
+
+ // Read attribute
+ DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
+ if (get_ndb_value(op, field_no, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= no_fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+#endif
+
+
+/*
+ Read records from NDB using full table scan
+ */
+
+int ha_ndbcluster::full_table_scan(byte *buf)
+{
+ uint i;
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor;
+ NdbScanOperation *op;
+
+ DBUG_ENTER("full_table_scan");
+ DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
+
+ if (!(op=trans->getNdbScanOperation(m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ // Define attributes to read
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i]= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= table->fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+
+/*
+ Insert one record into NDB
+*/
+
+int ha_ndbcluster::write_row(byte *record)
+{
+ uint i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ int res;
+ DBUG_ENTER("write_row");
+
+ statistic_increment(ha_write_count,&LOCK_status);
+ if (table->timestamp_default_now)
+ update_timestamp(record+table->timestamp_default_now-1);
+ if (table->next_number_field && record == table->record[0])
+ update_auto_increment();
+
+ if (!(op= trans->getNdbOperation(m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+
+ res= (m_use_write) ? op->writeTuple() :op->insertTuple();
+ if (res != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // Table has hidden primary key
+ Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
+
+ // Set non-key attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if (!(field->flags & PRI_KEY_FLAG) &&
+ set_ndb_value(op, field, i))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ /*
+ Execute write operation
+ NOTE When doing inserts with many values in
+ each INSERT statement it should not be necessary
+ to NoCommit the transaction between each row.
+ Find out how this is detected!
+ */
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_RETURN(0);
+}
+
+
+/* Compare if a key in a row has changed */
+
+int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
+ const byte * new_row)
+{
+ KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
+ KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;
+
+ for (; key_part != end ; key_part++)
+ {
+ if (key_part->null_bit)
+ {
+ if ((old_row[key_part->null_offset] & key_part->null_bit) !=
+ (new_row[key_part->null_offset] & key_part->null_bit))
+ return 1;
+ }
+ if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH))
+ {
+
+ if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
+ (char*) (new_row + key_part->offset),
+ (ulong) key_part->length))
+ return 1;
+ }
+ else
+ {
+ if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
+ key_part->length))
+ return 1;
+ }
+ }
+ return 0;
+}
+
+/*
+ Update one record in NDB using primary key
+*/
+
+int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
+{
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ uint i;
+ DBUG_ENTER("update_row");
+
+ statistic_increment(ha_update_count,&LOCK_status);
+ if (table->timestamp_on_update_now)
+ update_timestamp(new_data+table->timestamp_on_update_now-1);
+
+ if (!(op= trans->getNdbOperation(m_tabname)) ||
+ op->updateTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+
+ // Require that the PK for this record has previously been
+ // read into m_value
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec);
+ DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ /* Check for update of primary key and return error */
+ if (key_cmp(table->primary_key, old_data, new_data))
+ DBUG_RETURN(HA_ERR_UNSUPPORTED);
+
+ int res;
+ if ((res= set_primary_key(op, old_data + table->null_bytes)))
+ DBUG_RETURN(res);
+ }
+
+ // Set non-key attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) &&
+ (!(field->flags & PRI_KEY_FLAG)) &&
+ set_ndb_value(op, field, i))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ // Execute update operation
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Delete one record from NDB, using primary key
+*/
+
+int ha_ndbcluster::delete_row(const byte *record)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ DBUG_ENTER("delete_row");
+
+ statistic_increment(ha_delete_count,&LOCK_status);
+
+ if (!(op=trans->getNdbOperation(m_tabname)) ||
+ op->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec != NULL);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
+
+ // Execute delete operation
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_RETURN(0);
+}
+
+/*
+ Unpack a record read from NDB
+
+ SYNOPSIS
+ unpack_record()
+ buf Buffer to store read row
+
+ NOTE
+ The data for each row is read directly into the
+ destination buffer. This function is primarily
+ called in order to check if any fields should be
+ set to null.
+*/
+
+void ha_ndbcluster::unpack_record(byte* buf)
+{
+ uint row_offset= (uint) (buf - table->record[0]);
+ Field **field, **end;
+ NdbRecAttr **value= m_value;
+ DBUG_ENTER("unpack_record");
+
+ // Set null flag(s)
+ bzero(buf, table->null_bytes);
+ for (field= table->field, end= field+table->fields;
+ field < end;
+ field++, value++)
+ {
+ if (*value && (*value)->isNULL())
+ (*field)->set_null(row_offset);
+ }
+
+#ifndef DBUG_OFF
+ // Read and print all values that was fetched
+ if (table->primary_key == MAX_KEY)
+ {
+ // Table with hidden primary key
+ int hidden_no= table->fields;
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBCOL *hidden_col= tab->getColumn(hidden_no);
+ NdbRecAttr* rec= m_value[hidden_no];
+ DBUG_ASSERT(rec);
+ DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
+ hidden_col->getName(), rec->u_64_value()));
+ }
+ print_results();
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Utility function to print/dump the fetched field
+ */
+
+void ha_ndbcluster::print_results()
+{
+ const NDBTAB *tab= (NDBTAB*) m_table;
+ DBUG_ENTER("print_results");
+
+#ifndef DBUG_OFF
+ if (!_db_on_)
+ DBUG_VOID_RETURN;
+
+ for (uint f=0; f<table->fields;f++)
+ {
+ Field *field;
+ const NDBCOL *col;
+ NdbRecAttr *value;
+
+ if (!(value= m_value[f]))
+ {
+ fprintf(DBUG_FILE, "Field %d was not read\n", f);
+ continue;
+ }
+ field= table->field[f];
+ DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
+ col= tab->getColumn(f);
+ fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
+
+ if (value->isNULL())
+ {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
+
+ switch (col->getType()) {
+ case NdbDictionary::Column::Blob:
+ case NdbDictionary::Column::Undefined:
+ fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
+ break;
+ case NdbDictionary::Column::Tinyint: {
+ char value= *field->ptr;
+ fprintf(DBUG_FILE, "Tinyint\t%d", value);
+ break;
+ }
+ case NdbDictionary::Column::Tinyunsigned: {
+ unsigned char value= *field->ptr;
+ fprintf(DBUG_FILE, "Tinyunsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Smallint: {
+ short value= *field->ptr;
+ fprintf(DBUG_FILE, "Smallint\t%d", value);
+ break;
+ }
+ case NdbDictionary::Column::Smallunsigned: {
+ unsigned short value= *field->ptr;
+ fprintf(DBUG_FILE, "Smallunsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Mediumint: {
+ byte value[3];
+ memcpy(value, field->ptr, 3);
+ fprintf(DBUG_FILE, "Mediumint\t%d,%d,%d", value[0], value[1], value[2]);
+ break;
+ }
+ case NdbDictionary::Column::Mediumunsigned: {
+ byte value[3];
+ memcpy(value, field->ptr, 3);
+ fprintf(DBUG_FILE, "Mediumunsigned\t%u,%u,%u", value[0], value[1], value[2]);
+ break;
+ }
+ case NdbDictionary::Column::Int: {
+ fprintf(DBUG_FILE, "Int\t%lld", field->val_int());
+ break;
+ }
+ case NdbDictionary::Column::Unsigned: {
+ Uint32 value= (Uint32) *field->ptr;
+ fprintf(DBUG_FILE, "Unsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Bigint: {
+ Int64 value= (Int64) *field->ptr;
+ fprintf(DBUG_FILE, "Bigint\t%lld", value);
+ break;
+ }
+ case NdbDictionary::Column::Bigunsigned: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Bigunsigned\t%llu", value);
+ break;
+ }
+ case NdbDictionary::Column::Float: {
+ float value= (float) *field->ptr;
+ fprintf(DBUG_FILE, "Float\t%f", value);
+ break;
+ }
+ case NdbDictionary::Column::Double: {
+ double value= (double) *field->ptr;
+ fprintf(DBUG_FILE, "Double\t%f", value);
+ break;
+ }
+ case NdbDictionary::Column::Decimal: {
+ char *value= field->ptr;
+
+ fprintf(DBUG_FILE, "Decimal\t'%-*s'", field->pack_length(), value);
+ break;
+ }
+ case NdbDictionary::Column::Char:{
+ char buf[field->pack_length()+1];
+ char *value= (char *) field->ptr;
+ snprintf(buf, field->pack_length(), "%s", value);
+ fprintf(DBUG_FILE, "Char\t'%s'", buf);
+ break;
+ }
+ case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Binary:
+ case NdbDictionary::Column::Varbinary: {
+ char *value= (char *) field->ptr;
+ fprintf(DBUG_FILE, "'%s'", value);
+ break;
+ }
+ case NdbDictionary::Column::Datetime: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Datetime\t%llu", value);
+ break;
+ }
+ case NdbDictionary::Column::Timespec: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Timespec\t%llu", value);
+ break;
+ }
+ }
+ fprintf(DBUG_FILE, "\n");
+
+ }
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+int ha_ndbcluster::index_init(uint index)
+{
+ DBUG_ENTER("index_init");
+ DBUG_PRINT("enter", ("index: %u", index));
+ DBUG_RETURN(handler::index_init(index));
+}
+
+
+int ha_ndbcluster::index_end()
+{
+ DBUG_ENTER("index_end");
+ DBUG_RETURN(rnd_end());
+}
+
+
+int ha_ndbcluster::index_read(byte *buf,
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ DBUG_ENTER("index_read");
+ DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
+ active_index, key_len, find_flag));
+
+ int error= 1;
+ statistic_increment(ha_read_key_count, &LOCK_status);
+
+ switch (get_index_type(active_index)){
+ case PRIMARY_KEY_INDEX:
+ error= pk_read(key, key_len, buf);
+ break;
+
+ case UNIQUE_INDEX:
+ error= unique_index_read(key, key_len, buf);
+ break;
+
+ case ORDERED_INDEX:
+ error= ordered_index_scan(key, key_len, buf, find_flag);
+ break;
+
+ default:
+ case UNDEFINED_INDEX:
+ break;
+ }
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ statistic_increment(ha_read_key_count,&LOCK_status);
+ DBUG_ENTER("index_read_idx");
+ DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));
+ index_init(index_no);
+ DBUG_RETURN(index_read(buf, key, key_len, find_flag));
+}
+
+
+int ha_ndbcluster::index_next(byte *buf)
+{
+ DBUG_ENTER("index_next");
+
+ int error = 1;
+ statistic_increment(ha_read_next_count,&LOCK_status);
+ if (get_index_type(active_index) == PRIMARY_KEY_INDEX)
+ error= HA_ERR_END_OF_FILE;
+ else
+ error = next_result(buf);
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::index_prev(byte *buf)
+{
+ DBUG_ENTER("index_prev");
+ statistic_increment(ha_read_prev_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::index_first(byte *buf)
+{
+ DBUG_ENTER("index_first");
+ statistic_increment(ha_read_first_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::index_last(byte *buf)
+{
+ DBUG_ENTER("index_last");
+ statistic_increment(ha_read_last_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::rnd_init(bool scan)
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("rnd_init");
+ DBUG_PRINT("enter", ("scan: %d", scan));
+ // Check that cursor is not defined
+ if (cursor)
+ DBUG_RETURN(1);
+ index_init(table->primary_key);
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::rnd_end()
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("rnd_end");
+
+ if (cursor)
+ {
+ DBUG_PRINT("info", ("Closing the cursor"));
+ cursor->close();
+ m_active_cursor= NULL;
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::rnd_next(byte *buf)
+{
+ DBUG_ENTER("rnd_next");
+ statistic_increment(ha_read_rnd_next_count, &LOCK_status);
+ int error = 1;
+ if (!m_active_cursor)
+ error = full_table_scan(buf);
+ else
+ error = next_result(buf);
+ DBUG_RETURN(error);
+}
+
+
+/*
+ An "interesting" record has been found and it's pk
+ retrieved by calling position
+ Now it's time to read the record from db once
+ again
+*/
+
+int ha_ndbcluster::rnd_pos(byte *buf, byte *pos)
+{
+ DBUG_ENTER("rnd_pos");
+ statistic_increment(ha_read_rnd_count,&LOCK_status);
+ // The primary key for the record is stored in pos
+ // Perform a pk_read using primary key "index"
+ DBUG_RETURN(pk_read(pos, ref_length, buf));
+}
+
+
+/*
+ Store the primary key of this record in ref
+ variable, so that the row can be retrieved again later
+ using "reference" in rnd_pos
+*/
+
+void ha_ndbcluster::position(const byte *record)
+{
+ KEY *key_info;
+ KEY_PART_INFO *key_part;
+ KEY_PART_INFO *end;
+ byte *buff;
+ DBUG_ENTER("position");
+
+ if (table->primary_key != MAX_KEY)
+ {
+ key_info= table->key_info + table->primary_key;
+ key_part= key_info->key_part;
+ end= key_part + key_info->key_parts;
+ buff= ref;
+
+ for (; key_part != end; key_part++)
+ {
+ if (key_part->null_bit) {
+ /* Store 0 if the key part is a NULL part */
+ if (record[key_part->null_offset]
+ & key_part->null_bit) {
+ *buff++= 1;
+ continue;
+ }
+ *buff++= 0;
+ }
+ memcpy(buff, record + key_part->offset, key_part->length);
+ buff += key_part->length;
+ }
+ }
+ else
+ {
+ // No primary key, get hidden key
+ DBUG_PRINT("info", ("Getting hidden key"));
+ int hidden_no= table->fields;
+ NdbRecAttr* rec= m_value[hidden_no];
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBCOL *hidden_col= tab->getColumn(hidden_no);
+ DBUG_ASSERT(hidden_col->getPrimaryKey() &&
+ hidden_col->getAutoIncrement() &&
+ rec != NULL &&
+ ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+ memcpy(ref, (const void*)rec->aRef(), ref_length);
+ }
+
+ DBUG_DUMP("ref", (char*)ref, ref_length);
+ DBUG_VOID_RETURN;
+}
+
+
+void ha_ndbcluster::info(uint flag)
+{
+ DBUG_ENTER("info");
+ DBUG_PRINT("enter", ("flag: %d", flag));
+
+ if (flag & HA_STATUS_POS)
+ DBUG_PRINT("info", ("HA_STATUS_POS"));
+ if (flag & HA_STATUS_NO_LOCK)
+ DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
+ if (flag & HA_STATUS_TIME)
+ DBUG_PRINT("info", ("HA_STATUS_TIME"));
+ if (flag & HA_STATUS_CONST)
+ DBUG_PRINT("info", ("HA_STATUS_CONST"));
+ if (flag & HA_STATUS_VARIABLE)
+ DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
+ if (flag & HA_STATUS_ERRKEY)
+ DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
+ if (flag & HA_STATUS_AUTO)
+ DBUG_PRINT("info", ("HA_STATUS_AUTO"));
+ DBUG_VOID_RETURN;
+}
+
+
+int ha_ndbcluster::extra(enum ha_extra_function operation)
+{
+ DBUG_ENTER("extra");
+ switch (operation) {
+ case HA_EXTRA_NORMAL: /* Optimize for space (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NORMAL"));
+ break;
+ case HA_EXTRA_QUICK: /* Optimize for speed */
+ DBUG_PRINT("info", ("HA_EXTRA_QUICK"));
+ break;
+ case HA_EXTRA_RESET: /* Reset database to after open */
+ DBUG_PRINT("info", ("HA_EXTRA_RESET"));
+ break;
+ case HA_EXTRA_CACHE: /* Cash record in HA_rrnd() */
+ DBUG_PRINT("info", ("HA_EXTRA_CACHE"));
+ break;
+ case HA_EXTRA_NO_CACHE: /* End cacheing of records (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE"));
+ break;
+ case HA_EXTRA_NO_READCHECK: /* No readcheck on update */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK"));
+ break;
+ case HA_EXTRA_READCHECK: /* Use readcheck (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_READCHECK"));
+ break;
+ case HA_EXTRA_KEYREAD: /* Read only key to database */
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD"));
+ break;
+ case HA_EXTRA_NO_KEYREAD: /* Normal read of records (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD"));
+ break;
+ case HA_EXTRA_NO_USER_CHANGE: /* No user is allowed to write */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE"));
+ break;
+ case HA_EXTRA_KEY_CACHE:
+ DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE"));
+ break;
+ case HA_EXTRA_NO_KEY_CACHE:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE"));
+ break;
+ case HA_EXTRA_WAIT_LOCK: /* Wait until file is avalably (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK"));
+ break;
+ case HA_EXTRA_NO_WAIT_LOCK: /* If file is locked, return quickly */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK"));
+ break;
+ case HA_EXTRA_WRITE_CACHE: /* Use write cache in ha_write() */
+ DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE"));
+ break;
+ case HA_EXTRA_FLUSH_CACHE: /* flush write_record_cache */
+ DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE"));
+ break;
+ case HA_EXTRA_NO_KEYS: /* Remove all update of keys */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS"));
+ break;
+ case HA_EXTRA_KEYREAD_CHANGE_POS: /* Keyread, but change pos */
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */
+ break;
+ case HA_EXTRA_REMEMBER_POS: /* Remember pos for next/prev */
+ DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS"));
+ break;
+ case HA_EXTRA_RESTORE_POS:
+ DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS"));
+ break;
+ case HA_EXTRA_REINIT_CACHE: /* init cache from current record */
+ DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE"));
+ break;
+ case HA_EXTRA_FORCE_REOPEN: /* Datafile have changed on disk */
+ DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN"));
+ break;
+ case HA_EXTRA_FLUSH: /* Flush tables to disk */
+ DBUG_PRINT("info", ("HA_EXTRA_FLUSH"));
+ break;
+ case HA_EXTRA_NO_ROWS: /* Don't write rows */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS"));
+ break;
+ case HA_EXTRA_RESET_STATE: /* Reset positions */
+ DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE"));
+ break;
+ case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
+ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
+
+ DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
+ m_use_write= TRUE;
+ break;
+ case HA_EXTRA_NO_IGNORE_DUP_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
+ DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
+ m_use_write= false;
+ break;
+ case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
+ where field->query_id is the same as
+ the current query id */
+ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
+ break;
+ case HA_EXTRA_PREPARE_FOR_DELETE:
+ DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
+ break;
+ case HA_EXTRA_PREPARE_FOR_UPDATE: /* Remove read cache if problems */
+ DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE"));
+ break;
+ case HA_EXTRA_PRELOAD_BUFFER_SIZE:
+ DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE"));
+ break;
+ case HA_EXTRA_RETRIEVE_PRIMARY_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
+ break;
+ case HA_EXTRA_CHANGE_KEY_TO_UNIQUE:
+ DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
+ break;
+ case HA_EXTRA_CHANGE_KEY_TO_DUP:
+ DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
+ break;
+
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
+{
+ DBUG_ENTER("extra_opt");
+ DBUG_PRINT("enter", ("cache_size: %d", cache_size));
+ DBUG_RETURN(extra(operation));
+}
+
+
+int ha_ndbcluster::reset()
+{
+ DBUG_ENTER("reset");
+ // Reset what?
+ DBUG_RETURN(1);
+}
+
+
+const char **ha_ndbcluster::bas_ext() const
+{ static const char *ext[1] = { NullS }; return ext; }
+
+
+/*
+ How many seeks it will take to read through the table
+ This is to be comparable to the number returned by records_in_range so
+ that we can decide if we should scan the table or use keys.
+*/
+
+double ha_ndbcluster::scan_time()
+{
+ return rows2double(records/3);
+}
+
+
+THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type)
+{
+ DBUG_ENTER("store_lock");
+
+ if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK)
+ {
+
+ /* If we are not doing a LOCK TABLE, then allow multiple
+ writers */
+
+ if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+ lock_type <= TL_WRITE) && !thd->in_lock_tables)
+ lock_type= TL_WRITE_ALLOW_WRITE;
+
+ /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
+ MySQL would use the lock TL_READ_NO_INSERT on t2, and that
+ would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
+ to t2. Convert the lock to a normal read lock to allow
+ concurrent inserts to t2. */
+
+ if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
+ lock_type= TL_READ;
+
+ m_lock.type=lock_type;
+ }
+ *to++= &m_lock;
+
+ DBUG_RETURN(to);
+}
+
+#ifndef DBUG_OFF
+#define PRINT_OPTION_FLAGS(t) { \
+ if (t->options & OPTION_NOT_AUTOCOMMIT) \
+ DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \
+ if (t->options & OPTION_BEGIN) \
+ DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \
+ if (t->options & OPTION_TABLE_LOCK) \
+ DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \
+}
+#else
+#define PRINT_OPTION_FLAGS(t)
+#endif
+
+
+/*
+ As MySQL will execute an external lock for every new table it uses
+ we can use this to start the transactions.
+ If we are in auto_commit mode we just need to start a transaction
+ for the statement, this will be stored in transaction.stmt.
+ If not, we have to start a master transaction if there doesn't exist
+ one from before, this will be stored in transaction.all
+
+ When a table lock is held one transaction will be started which holds
+ the table lock and for each statement a hupp transaction will be started
+ */
+
+int ha_ndbcluster::external_lock(THD *thd, int lock_type)
+{
+ int error=0;
+ NdbConnection* trans= NULL;
+
+ DBUG_ENTER("external_lock");
+ DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
+ thd->transaction.ndb_lock_count));
+
+ /*
+ Check that this handler instance has a connection
+ set up to the Ndb object of thd
+ */
+ if (check_ndb_connection())
+ DBUG_RETURN(1);
+
+ if (lock_type != F_UNLCK)
+ {
+ if (!thd->transaction.ndb_lock_count++)
+ {
+ PRINT_OPTION_FLAGS(thd);
+
+ if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK)))
+ {
+ // Autocommit transaction
+ DBUG_ASSERT(!thd->transaction.stmt.ndb_tid);
+ DBUG_PRINT("trans",("Starting transaction stmt"));
+
+ trans= m_ndb->startTransaction();
+ if (trans == NULL)
+ {
+ thd->transaction.ndb_lock_count--; // We didn't get the lock
+ ERR_RETURN(m_ndb->getNdbError());
+ }
+ thd->transaction.stmt.ndb_tid= trans;
+ }
+ else
+ {
+ if (!thd->transaction.all.ndb_tid)
+ {
+ // Not autocommit transaction
+ // A "master" transaction ha not been started yet
+ DBUG_PRINT("trans",("starting transaction, all"));
+
+ trans= m_ndb->startTransaction();
+ if (trans == NULL)
+ {
+ thd->transaction.ndb_lock_count--; // We didn't get the lock
+ ERR_RETURN(m_ndb->getNdbError());
+ }
+
+ /*
+ If this is the start of a LOCK TABLE, a table look
+ should be taken on the table in NDB
+
+ Check if it should be read or write lock
+ */
+ if (thd->options & (OPTION_TABLE_LOCK))
+ {
+ //lockThisTable();
+ DBUG_PRINT("info", ("Locking the table..." ));
+ }
+
+ thd->transaction.all.ndb_tid= trans;
+ }
+ }
+ }
+ /*
+ This is the place to make sure this handler instance
+ has a started transaction.
+
+ The transaction is started by the first handler on which
+ MySQL Server calls external lock
+
+ Other handlers in the same stmt or transaction should use
+ the same NDB transaction. This is done by setting up the m_active_trans
+ pointer to point to the NDB transaction.
+ */
+
+ m_active_trans= thd->transaction.all.ndb_tid ?
+ (NdbConnection*)thd->transaction.all.ndb_tid:
+ (NdbConnection*)thd->transaction.stmt.ndb_tid;
+ DBUG_ASSERT(m_active_trans);
+
+ }
+ else
+ {
+ if (!--thd->transaction.ndb_lock_count)
+ {
+ DBUG_PRINT("trans", ("Last external_lock"));
+ PRINT_OPTION_FLAGS(thd);
+
+ if (thd->transaction.stmt.ndb_tid)
+ {
+ /*
+ Unlock is done without a transaction commit / rollback.
+ This happens if the thread didn't update any rows
+ We must in this case close the transaction to release resources
+ */
+ DBUG_PRINT("trans",("ending non-updating transaction"));
+ m_ndb->closeTransaction(m_active_trans);
+ thd->transaction.stmt.ndb_tid= 0;
+ }
+ }
+ m_active_trans= NULL;
+ }
+ DBUG_RETURN(error);
+}
+
+/*
+ When using LOCK TABLE's external_lock is only called when the actual
+ TABLE LOCK is done.
+ Under LOCK TABLES, each used tables will force a call to start_stmt.
+*/
+
+int ha_ndbcluster::start_stmt(THD *thd)
+{
+ int error=0;
+ DBUG_ENTER("start_stmt");
+ PRINT_OPTION_FLAGS(thd);
+
+ NdbConnection *trans= (NdbConnection*)thd->transaction.stmt.ndb_tid;
+ if (!trans){
+ DBUG_PRINT("trans",("Starting transaction stmt"));
+
+ NdbConnection *tablock_trans=
+ (NdbConnection*)thd->transaction.all.ndb_tid;
+ DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
+ DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
+ if (trans == NULL)
+ ERR_RETURN(m_ndb->getNdbError());
+ thd->transaction.stmt.ndb_tid= trans;
+ }
+ m_active_trans= trans;
+
+ DBUG_RETURN(error);
+}
+
+
+/*
+ Commit a transaction started in NDB
+ */
+
+int ndbcluster_commit(THD *thd, void *ndb_transaction)
+{
+ int res= 0;
+ Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ NdbConnection *trans= (NdbConnection*)ndb_transaction;
+
+ DBUG_ENTER("ndbcluster_commit");
+ DBUG_PRINT("transaction",("%s",
+ trans == thd->transaction.stmt.ndb_tid ?
+ "stmt" : "all"));
+ DBUG_ASSERT(ndb && trans);
+
+ if (trans->execute(Commit) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ ERR_PRINT(err);
+ res= ndb_to_mysql_error(&err);
+ }
+ ndb->closeTransaction(trans);
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Rollback a transaction started in NDB
+ */
+
+int ndbcluster_rollback(THD *thd, void *ndb_transaction)
+{
+ int res= 0;
+ Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ NdbConnection *trans= (NdbConnection*)ndb_transaction;
+
+ DBUG_ENTER("ndbcluster_rollback");
+ DBUG_PRINT("transaction",("%s",
+ trans == thd->transaction.stmt.ndb_tid ?
+ "stmt" : "all"));
+ DBUG_ASSERT(ndb && trans);
+
+ if (trans->execute(Rollback) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ ERR_PRINT(err);
+ res= ndb_to_mysql_error(&err);
+ }
+ ndb->closeTransaction(trans);
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Map MySQL type to the corresponding NDB type
+ */
+
+inline NdbDictionary::Column::Type
+mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
+{
+ switch(mysql_type) {
+ case MYSQL_TYPE_DECIMAL:
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_TINY:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Tinyunsigned :
+ NdbDictionary::Column::Tinyint;
+ case MYSQL_TYPE_SHORT:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Smallunsigned :
+ NdbDictionary::Column::Smallint;
+ case MYSQL_TYPE_LONG:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Unsigned :
+ NdbDictionary::Column::Int;
+ case MYSQL_TYPE_TIMESTAMP:
+ return NdbDictionary::Column::Unsigned;
+ case MYSQL_TYPE_LONGLONG:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Bigunsigned :
+ NdbDictionary::Column::Bigint;
+ case MYSQL_TYPE_INT24:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Mediumunsigned :
+ NdbDictionary::Column::Mediumint;
+ break;
+ case MYSQL_TYPE_FLOAT:
+ return NdbDictionary::Column::Float;
+ case MYSQL_TYPE_DOUBLE:
+ return NdbDictionary::Column::Double;
+ case MYSQL_TYPE_DATETIME :
+ return NdbDictionary::Column::Datetime;
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_YEAR :
+ // Missing NDB data types, mapped to char
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_ENUM :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_SET :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_TINY_BLOB :
+ case MYSQL_TYPE_MEDIUM_BLOB :
+ case MYSQL_TYPE_LONG_BLOB :
+ case MYSQL_TYPE_BLOB :
+ return NdbDictionary::Column::Blob;
+ case MYSQL_TYPE_VAR_STRING :
+ return NdbDictionary::Column::Varchar;
+ case MYSQL_TYPE_STRING :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_GEOMETRY :
+ return NdbDictionary::Column::Undefined;
+ }
+ return NdbDictionary::Column::Undefined;
+}
+
+
+/*
+ Create a table in NDB Cluster
+ */
+
+int ha_ndbcluster::create(const char *name,
+ TABLE *form,
+ HA_CREATE_INFO *info)
+{
+ NDBTAB tab;
+ NdbDictionary::Column::Type ndb_type;
+ NDBCOL col;
+ uint pack_length, length, i;
+ int res;
+ const void *data, *pack_data;
+ const char **key_name= form->keynames.type_names;
+ char name2[FN_HEADLEN];
+
+ DBUG_ENTER("create");
+ DBUG_PRINT("enter", ("name: %s", name));
+ fn_format(name2, name, "", "",2); // Remove the .frm extension
+ set_dbname(name2);
+ set_tabname(name2);
+
+ DBUG_PRINT("table", ("name: %s", m_tabname));
+ tab.setName(m_tabname);
+ tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
+
+ // Save frm data for this table
+ if (readfrm(name, &data, &length))
+ DBUG_RETURN(1);
+ if (packfrm(data, length, &pack_data, &pack_length))
+ DBUG_RETURN(2);
+
+ DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
+ tab.setFrm(pack_data, pack_length);
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+
+ for (i= 0; i < form->fields; i++)
+ {
+ Field *field= form->field[i];
+ ndb_type= mysql_to_ndb_type(field->real_type(),
+ field->flags & UNSIGNED_FLAG);
+ DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
+ field->field_name, field->real_type(),
+ field->pack_length()));
+ col.setName(field->field_name);
+ col.setType(ndb_type);
+ if ((ndb_type == NdbDictionary::Column::Char) ||
+ (ndb_type == NdbDictionary::Column::Varchar))
+ col.setLength(field->pack_length());
+ else
+ col.setLength(1);
+ col.setNullable(field->maybe_null());
+ col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
+ if (field->flags & AUTO_INCREMENT_FLAG)
+ {
+ DBUG_PRINT("info", ("Found auto_increment key"));
+ col.setAutoIncrement(TRUE);
+ ulonglong value = info->auto_increment_value ?
+ info->auto_increment_value -1 :
+ (ulonglong) 0;
+ DBUG_PRINT("info", ("initial value=%ld", value));
+// col.setInitialAutIncValue(value);
+ }
+ else
+ col.setAutoIncrement(false);
+
+ tab.addColumn(col);
+ }
+
+ // No primary key, create shadow key as 64 bit, auto increment
+ if (form->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Generating shadow key"));
+ col.setName("$PK");
+ col.setType(NdbDictionary::Column::Bigunsigned);
+ col.setLength(1);
+ col.setNullable(false);
+ col.setPrimaryKey(TRUE);
+ col.setAutoIncrement(TRUE);
+ tab.addColumn(col);
+ }
+
+ my_errno= 0;
+ if (check_ndb_connection())
+ {
+ my_errno= HA_ERR_NO_CONNECTION;
+ DBUG_RETURN(my_errno);
+ }
+
+ // Create the table in NDB
+ NDBDICT *dict= m_ndb->getDictionary();
+ if (dict->createTable(tab))
+ {
+ const NdbError err= dict->getNdbError();
+ ERR_PRINT(err);
+ my_errno= ndb_to_mysql_error(&err);
+ DBUG_RETURN(my_errno);
+ }
+ DBUG_PRINT("info", ("Table %s/%s created successfully",
+ m_dbname, m_tabname));
+
+ // Fetch table from NDB, check that it exists
+ const NDBTAB *tab2= dict->getTable(m_tabname);
+ if (tab2 == NULL)
+ {
+ const NdbError err= dict->getNdbError();
+ ERR_PRINT(err);
+ my_errno= ndb_to_mysql_error(&err);
+ DBUG_RETURN(my_errno);
+ }
+
+ // Create secondary indexes
+ for (i= 0; i < form->keys; i++)
+ {
+ DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i]));
+ if (i == form->primary_key)
+ {
+ DBUG_PRINT("info", ("Skipping it, PK already created"));
+ continue;
+ }
+
+ DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i]));
+ res= create_index(key_name[i],
+ form->key_info + i);
+ switch(res){
+ case 0:
+ // OK
+ break;
+ default:
+ DBUG_PRINT("error", ("Failed to create index %u", i));
+ drop_table();
+ my_errno= res;
+ goto err_end;
+ }
+ }
+
+err_end:
+ DBUG_RETURN(my_errno);
+}
+
+
+/*
+ Create an index in NDB Cluster
+ */
+
+int ha_ndbcluster::create_index(const char *name,
+ KEY *key_info){
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
+ KEY_PART_INFO *key_part= key_info->key_part;
+ KEY_PART_INFO *end= key_part + key_info->key_parts;
+
+ DBUG_ENTER("create_index");
+ DBUG_PRINT("enter", ("name: %s ", name));
+
+ // Check that an index with the same name do not already exist
+ if (dict->getIndex(name, m_tabname))
+ ERR_RETURN(dict->getNdbError());
+
+ NdbDictionary::Index ndb_index(name);
+ if (key_info->flags & HA_NOSAME)
+ ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
+ else
+ {
+ ndb_index.setType(NdbDictionary::Index::OrderedIndex);
+ // TODO Only temporary ordered indexes supported
+ ndb_index.setLogging(false);
+ }
+ ndb_index.setTable(m_tabname);
+
+ for (; key_part != end; key_part++)
+ {
+ Field *field= key_part->field;
+ DBUG_PRINT("info", ("attr: %s", field->field_name));
+ ndb_index.addColumnName(field->field_name);
+ }
+
+ if (dict->createIndex(ndb_index))
+ ERR_RETURN(dict->getNdbError());
+
+ // Success
+ DBUG_PRINT("info", ("Created index %s", name));
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Rename a table in NDB Cluster
+*/
+
+int ha_ndbcluster::rename_table(const char *from, const char *to)
+{
+ char new_tabname[FN_HEADLEN];
+
+ DBUG_ENTER("ha_ndbcluster::rename_table");
+ set_dbname(from);
+ set_tabname(from);
+ set_tabname(to, new_tabname);
+
+ if (check_ndb_connection()) {
+ my_errno= HA_ERR_NO_CONNECTION;
+ DBUG_RETURN(my_errno);
+ }
+
+ int result= alter_table_name(m_tabname, new_tabname);
+ if (result == 0)
+ set_tabname(to);
+
+ DBUG_RETURN(result);
+}
+
+
+/*
+ Rename a table in NDB Cluster using alter table
+ */
+
+int ha_ndbcluster::alter_table_name(const char *from, const char *to)
+{
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *orig_tab;
+ DBUG_ENTER("alter_table_name_table");
+ DBUG_PRINT("enter", ("Renaming %s to %s", from, to));
+
+ if (!(orig_tab= dict->getTable(from)))
+ ERR_RETURN(dict->getNdbError());
+
+ NdbDictionary::Table copy_tab= dict->getTableForAlteration(from);
+ copy_tab.setName(to);
+ if (dict->alterTable(copy_tab) != 0)
+ ERR_RETURN(dict->getNdbError());
+
+ m_table= NULL;
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Delete a table from NDB Cluster
+ */
+
+int ha_ndbcluster::delete_table(const char *name)
+{
+ DBUG_ENTER("delete_table");
+ DBUG_PRINT("enter", ("name: %s", name));
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ DBUG_RETURN(drop_table());
+}
+
+
+/*
+ Drop a table in NDB Cluster
+ */
+
+int ha_ndbcluster::drop_table()
+{
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
+
+ DBUG_ENTER("drop_table");
+ DBUG_PRINT("enter", ("Deleting %s", m_tabname));
+
+ if (dict->dropTable(m_tabname))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ ; // 709: No such table existed
+ else
+ ERR_RETURN(dict->getNdbError());
+ }
+ release_metadata();
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Drop a database in NDB Cluster
+ */
+
+int ndbcluster_drop_database(const char *path)
+{
+ DBUG_ENTER("ndbcluster_drop_database");
+ // TODO drop all tables for this database
+ DBUG_RETURN(1);
+}
+
+
+longlong ha_ndbcluster::get_auto_increment()
+{
+ // NOTE If number of values to be inserted is known
+ // the autoincrement cache could be used here
+ Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ return (longlong)auto_value;
+}
+
+
+/*
+ Constructor for the NDB Cluster table handler
+ */
+
+ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
+ handler(table_arg),
+ m_active_trans(NULL),
+ m_active_cursor(NULL),
+ m_ndb(NULL),
+ m_table(NULL),
+ m_table_flags(HA_REC_NOT_IN_SEQ |
+ HA_KEYPOS_TO_RNDPOS |
+ HA_NOT_EXACT_COUNT |
+ HA_NO_WRITE_DELAYED |
+ HA_NO_PREFIX_CHAR_KEYS |
+ HA_NO_BLOBS |
+ HA_DROP_BEFORE_CREATE |
+ HA_NOT_READ_AFTER_KEY),
+ m_use_write(false)
+{
+
+ DBUG_ENTER("ha_ndbcluster");
+
+ m_tabname[0]= '\0';
+ m_dbname[0]= '\0';
+
+ // TODO Adjust number of records and other parameters for proper
+ // selection of scan/pk access
+ records= 100;
+ block_size= 1024;
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Destructor for NDB Cluster table handler
+ */
+
+ha_ndbcluster::~ha_ndbcluster()
+{
+ DBUG_ENTER("~ha_ndbcluster");
+
+ release_metadata();
+
+ // Check for open cursor/transaction
+ DBUG_ASSERT(m_active_cursor == NULL);
+ DBUG_ASSERT(m_active_trans == NULL);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Open a table for further use
+ - fetch metadata for this table from NDB
+ - check that table exists
+*/
+
+int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
+{
+ KEY *key;
+ DBUG_ENTER("open");
+ DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
+ name, mode, test_if_locked));
+
+ // Setup ref_length to make room for the whole
+ // primary key to be written in the ref variable
+
+ if (table->primary_key != MAX_KEY)
+ {
+ key= table->key_info+table->primary_key;
+ ref_length= key->key_length;
+ DBUG_PRINT("info", (" ref_length: %d", ref_length));
+ }
+ // Init table lock structure
+ if (!(m_share=get_share(name)))
+ DBUG_RETURN(1);
+ thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
+
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ DBUG_RETURN(get_metadata(name));
+}
+
+
+/*
+ Close the table
+ - release resources setup by open()
+ */
+
+int ha_ndbcluster::close(void)
+{
+ DBUG_ENTER("close");
+ free_share(m_share);
+ release_metadata();
+ m_ndb= NULL;
+ DBUG_RETURN(0);
+}
+
+
+Ndb* ha_ndbcluster::seize_ndb()
+{
+ Ndb* ndb;
+ DBUG_ENTER("seize_ndb");
+
+#ifdef USE_NDB_POOL
+ // Seize from pool
+ ndb= Ndb::seize();
+#else
+ ndb= new Ndb("");
+#endif
+ if (ndb->init(NDB_MAX_TRANSACTIONS) != 0)
+ {
+ ERR_PRINT(ndb->getNdbError());
+ /*
+ TODO
+ Alt.1 If init fails because to many allocated Ndb
+ wait on condition for a Ndb object to be released.
+ Alt.2 Seize/release from pool, wait until next release
+ */
+ delete ndb;
+ ndb= NULL;
+ }
+ DBUG_RETURN(ndb);
+}
+
+
+void ha_ndbcluster::release_ndb(Ndb* ndb)
+{
+ DBUG_ENTER("release_ndb");
+#ifdef USE_NDB_POOL
+ // Release to pool
+ Ndb::release(ndb);
+#else
+ delete ndb;
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ If this thread already has a Ndb object allocated
+ in current THD, reuse it. Otherwise
+ seize a Ndb object, assign it to current THD and use it.
+
+ Having a Ndb object also means that a connection to
+ NDB cluster has been opened. The connection is
+ checked.
+
+*/
+
+int ha_ndbcluster::check_ndb_connection()
+{
+ THD* thd= current_thd;
+ Ndb* ndb;
+ DBUG_ENTER("check_ndb_connection");
+
+ if (!thd->transaction.ndb)
+ {
+ ndb= seize_ndb();
+ if (!ndb)
+ DBUG_RETURN(2);
+ thd->transaction.ndb= ndb;
+ }
+ m_ndb= (Ndb*)thd->transaction.ndb;
+ m_ndb->setDatabaseName(m_dbname);
+ if (m_ndb->waitUntilReady() != 0)
+ {
+ DBUG_PRINT("error", ("Ndb was not ready"));
+ DBUG_RETURN(3);
+ }
+ DBUG_RETURN(0);
+}
+
+void ndbcluster_close_connection(THD *thd)
+{
+ Ndb* ndb;
+ DBUG_ENTER("ndbcluster_close_connection");
+ ndb= (Ndb*)thd->transaction.ndb;
+ ha_ndbcluster::release_ndb(ndb);
+ thd->transaction.ndb= NULL;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Try to discover one table from NDB
+ */
+
+int ndbcluster_discover(const char *dbname, const char *name,
+ const void** frmblob, uint* frmlen)
+{
+ uint len;
+ const void* data;
+ const NDBTAB* tab;
+ DBUG_ENTER("ndbcluster_discover");
+ DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+
+ Ndb ndb(dbname);
+ if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0))
+ ERR_RETURN(ndb.getNdbError());
+
+ if (!(tab= ndb.getDictionary()->getTable(name)))
+ {
+ DBUG_PRINT("info", ("Table %s not found", name));
+ DBUG_RETURN(1);
+ }
+
+ DBUG_PRINT("info", ("Found table %s", tab->getName()));
+
+ len= tab->getFrmLength();
+ if (len == 0 || tab->getFrmData() == NULL)
+ {
+ DBUG_PRINT("No frm data found",
+ ("Table is probably created via NdbApi"));
+ DBUG_RETURN(2);
+ }
+
+ if (unpackfrm(&data, &len, tab->getFrmData()))
+ DBUG_RETURN(3);
+
+ *frmlen= len;
+ *frmblob= data;
+
+ DBUG_RETURN(0);
+}
+
+static Ndb* g_ndb= NULL;
+
+#ifdef USE_DISCOVER_ON_STARTUP
+/*
+ Dicover tables from NDB Cluster
+ - fetch a list of tables from NDB
+ - store the frm file for each table on disk
+ - if the table has an attached frm file
+ - if the database of the table exists
+*/
+
+int ndb_discover_tables()
+{
+ uint i;
+ NdbDictionary::Dictionary::List list;
+ NdbDictionary::Dictionary* dict;
+ char path[FN_REFLEN];
+ DBUG_ENTER("ndb_discover_tables");
+
+ /* List tables in NDB Cluster kernel */
+ dict= g_ndb->getDictionary();
+ if (dict->listObjects(list,
+ NdbDictionary::Object::UserTable) != 0)
+ ERR_RETURN(g_ndb->getNdbError());
+
+ for (i= 0 ; i < list.count ; i++)
+ {
+ NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+
+ DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
+ if (create_table_from_handler(t.database, t.name, true))
+ DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
+ }
+ DBUG_RETURN(0);
+}
+#endif
+
+
+/*
+ Initialise all gloal variables before creating
+ a NDB Cluster table handler
+ */
+
+bool ndbcluster_init()
+{
+ DBUG_ENTER("ndbcluster_init");
+ // Create a Ndb object to open the connection to NDB
+ g_ndb= new Ndb("sys");
+ if (g_ndb->init() != 0)
+ {
+ ERR_PRINT (g_ndb->getNdbError());
+ DBUG_RETURN(TRUE);
+ }
+ if (g_ndb->waitUntilReady() != 0)
+ {
+ ERR_PRINT (g_ndb->getNdbError());
+ DBUG_RETURN(TRUE);
+ }
+ (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
+ (hash_get_key) ndbcluster_get_key,0,0);
+ pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+ ndbcluster_inited= 1;
+#ifdef USE_DISCOVER_ON_STARTUP
+ if (ndb_discover_tables() != 0)
+ DBUG_RETURN(TRUE);
+#endif
+ DBUG_RETURN(false);
+}
+
+
+/*
+ End use of the NDB Cluster table handler
+ - free all global variables allocated by
+ ndcluster_init()
+*/
+
+bool ndbcluster_end()
+{
+ DBUG_ENTER("ndbcluster_end");
+ delete g_ndb;
+ g_ndb= NULL;
+ if (!ndbcluster_inited)
+ DBUG_RETURN(0);
+ hash_free(&ndbcluster_open_tables);
+#ifdef USE_NDB_POOL
+ ndb_pool_release();
+#endif
+ pthread_mutex_destroy(&ndbcluster_mutex);
+ ndbcluster_inited= 0;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Set m_tabname from full pathname to table file
+ */
+
+void ha_ndbcluster::set_tabname(const char *path_name)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ end= strend(path_name)-1;
+ ptr= end;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len= end - ptr;
+ memcpy(m_tabname, ptr + 1, end - ptr);
+ m_tabname[name_len]= '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+ ptr= m_tabname;
+
+ while (*ptr != '\0') {
+ *ptr = tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+/**
+ * Set a given location from full pathname to table file
+ *
+ */
+void
+ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ end = strend(path_name)-1;
+ ptr = end;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len = end - ptr;
+ memcpy(tabname, ptr + 1, end - ptr);
+ tabname[name_len] = '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+ ptr = tabname;
+
+ while (*ptr != '\0') {
+ *ptr= tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+
+/*
+ Set m_dbname from full pathname to table file
+
+ */
+
+void ha_ndbcluster::set_dbname(const char *path_name)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ ptr= strend(path_name)-1;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ ptr--;
+ end= ptr;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len= end - ptr;
+ memcpy(m_dbname, ptr + 1, name_len);
+ m_dbname[name_len]= '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+
+ ptr= m_dbname;
+
+ while (*ptr != '\0') {
+ *ptr= tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+
+ha_rows
+ha_ndbcluster::records_in_range(int inx,
+ const byte *start_key,uint start_key_len,
+ enum ha_rkey_function start_search_flag,
+ const byte *end_key,uint end_key_len,
+ enum ha_rkey_function end_search_flag)
+{
+ ha_rows records= 10;
+ KEY* key_info= table->key_info + inx;
+ uint key_length= key_info->key_length;
+
+ DBUG_ENTER("records_in_range");
+ DBUG_PRINT("enter", ("inx: %d", inx));
+ DBUG_PRINT("enter", ("start_key: %x, start_key_len: %d", start_key, start_key_len));
+ DBUG_PRINT("enter", ("start_search_flag: %d", start_search_flag));
+ DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len));
+ DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag));
+
+ /*
+ Check that start_key_len is equal to
+ the length of the used index and
+ prevent partial scan/read of hash indexes by returning HA_POS_ERROR
+ */
+ NDB_INDEX_TYPE idx_type= get_index_type(inx);
+ if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
+ start_key_len < key_length)
+ {
+ DBUG_PRINT("warning", ("Tried to use index which required"
+ "full key length: %d, HA_POS_ERROR",
+ key_length));
+ records= HA_POS_ERROR;
+ }
+ DBUG_RETURN(records);
+}
+
+
+/*
+ Handling the shared NDB_SHARE structure that is needed to
+ provide table locking.
+ It's also used for sharing data with other NDB handlers
+ in the same MySQL Server. There is currently not much
+ data we want to or can share.
+ */
+
+static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
+ my_bool not_used __attribute__((unused)))
+{
+ *length=share->table_name_length;
+ return (byte*) share->table_name;
+}
+
+static NDB_SHARE* get_share(const char *table_name)
+{
+ NDB_SHARE *share;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ uint length=(uint) strlen(table_name);
+ if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) table_name,
+ length)))
+ {
+ if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+ MYF(MY_WME | MY_ZEROFILL))))
+ {
+ share->table_name_length=length;
+ share->table_name=(char*) (share+1);
+ strmov(share->table_name,table_name);
+ if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+ {
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ my_free((gptr) share,0);
+ return 0;
+ }
+ thr_lock_init(&share->lock);
+ pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+ }
+ }
+ share->use_count++;
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return share;
+}
+
+
+static void free_share(NDB_SHARE *share)
+{
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (!--share->use_count)
+ {
+ hash_delete(&ndbcluster_open_tables, (byte*) share);
+ thr_lock_delete(&share->lock);
+ pthread_mutex_destroy(&share->mutex);
+ my_free((gptr) share, MYF(0));
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+}
+
+
+
+/*
+ Internal representation of the frm blob
+
+*/
+
+struct frm_blob_struct
+{
+ struct frm_blob_header
+ {
+ uint ver; // Version of header
+ uint orglen; // Original length of compressed data
+ uint complen; // Compressed length of data, 0=uncompressed
+ } head;
+ char data[1];
+};
+
+
+
+static int packfrm(const void *data, uint len,
+ const void **pack_data, uint *pack_len)
+{
+ int error;
+ ulong org_len, comp_len;
+ uint blob_len;
+ frm_blob_struct* blob;
+ DBUG_ENTER("packfrm");
+ DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
+
+ error= 1;
+ org_len = len;
+ if (my_compress((byte*)data, &org_len, &comp_len))
+ goto err;
+
+ DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
+ DBUG_DUMP("compressed", (char*)data, org_len);
+
+ error= 2;
+ blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
+ if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
+ goto err;
+
+ // Store compressed blob in machine independent format
+ int4store((char*)(&blob->head.ver), 1);
+ int4store((char*)(&blob->head.orglen), comp_len);
+ int4store((char*)(&blob->head.complen), org_len);
+
+ // Copy frm data into blob, already in machine independent format
+ memcpy(blob->data, data, org_len);
+
+ *pack_data = blob;
+ *pack_len = blob_len;
+ error = 0;
+
+ DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
+err:
+ DBUG_RETURN(error);
+
+}
+
+
+static int unpackfrm(const void **unpack_data, uint *unpack_len,
+ const void *pack_data)
+{
+ const frm_blob_struct *blob = (frm_blob_struct*)pack_data;
+ byte *data;
+ ulong complen, orglen, ver;
+ DBUG_ENTER("unpackfrm");
+ DBUG_PRINT("enter", ("pack_data: %x", pack_data));
+
+ complen= uint4korr((char*)&blob->head.complen);
+ orglen= uint4korr((char*)&blob->head.orglen);
+ ver= uint4korr((char*)&blob->head.ver);
+
+ DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
+ ver,complen,orglen));
+ DBUG_DUMP("blob->data", (char*) blob->data, complen);
+
+ if (ver != 1)
+ DBUG_RETURN(1);
+ if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME))))
+ DBUG_RETURN(2);
+ memcpy(data, blob->data, complen);
+
+ if (my_uncompress(data, &complen, &orglen))
+ {
+ my_free((char*)data, MYF(0));
+ DBUG_RETURN(3);
+ }
+
+ *unpack_data = data;
+ *unpack_len = complen;
+
+ DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
+
+ DBUG_RETURN(0);
+}
+#endif /* HAVE_NDBCLUSTER_DB */
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
new file mode 100755
index 00000000000..ed66d07d79b
--- /dev/null
+++ b/sql/ha_ndbcluster.h
@@ -0,0 +1,218 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ This file defines the NDB Cluster handler: the interface between MySQL and
+ NDB Cluster
+*/
+
+/* The class defining a handle to an NDB Cluster table */
+
+#ifdef __GNUC__
+#pragma interface /* gcc class implementation */
+#endif
+
+#include <ndbapi_limits.h>
+#include <ndb_types.h>
+
+class Ndb; // Forward declaration
+class NdbOperation; // Forward declaration
+class NdbConnection; // Forward declaration
+class NdbRecAttr; // Forward declaration
+class NdbResultSet; // Forward declaration
+
+typedef enum ndb_index_type {
+ UNDEFINED_INDEX = 0,
+ PRIMARY_KEY_INDEX = 1,
+ UNIQUE_INDEX = 2,
+ ORDERED_INDEX = 3
+} NDB_INDEX_TYPE;
+
+
+typedef struct st_ndbcluster_share {
+ THR_LOCK lock;
+ pthread_mutex_t mutex;
+ char *table_name;
+ uint table_name_length,use_count;
+} NDB_SHARE;
+
+class ha_ndbcluster: public handler
+{
+ public:
+ ha_ndbcluster(TABLE *table);
+ ~ha_ndbcluster();
+
+ int open(const char *name, int mode, uint test_if_locked);
+ int close(void);
+
+ int write_row(byte *buf);
+ int update_row(const byte *old_data, byte *new_data);
+ int delete_row(const byte *buf);
+ int index_init(uint index);
+ int index_end();
+ int index_read(byte *buf, const byte *key, uint key_len,
+ enum ha_rkey_function find_flag);
+ int index_read_idx(byte *buf, uint index, const byte *key, uint key_len,
+ enum ha_rkey_function find_flag);
+ int index_next(byte *buf);
+ int index_prev(byte *buf);
+ int index_first(byte *buf);
+ int index_last(byte *buf);
+ int rnd_init(bool scan=1);
+ int rnd_end();
+ int rnd_next(byte *buf);
+ int rnd_pos(byte *buf, byte *pos);
+ void position(const byte *record);
+
+ void info(uint);
+ int extra(enum ha_extra_function operation);
+ int extra_opt(enum ha_extra_function operation, ulong cache_size);
+ int reset();
+ int external_lock(THD *thd, int lock_type);
+ int start_stmt(THD *thd);
+ const char * table_type() const { return("ndbcluster");}
+ const char ** bas_ext() const;
+ ulong table_flags(void) const { return m_table_flags; }
+ ulong index_flags(uint idx) const;
+ uint max_record_length() const { return NDB_MAX_TUPLE_SIZE; };
+ uint max_keys() const { return MAX_KEY; }
+ uint max_key_parts() const { return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY; };
+ uint max_key_length() const { return NDB_MAX_KEY_SIZE;};
+
+ int rename_table(const char *from, const char *to);
+ int delete_table(const char *name);
+ int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
+ THR_LOCK_DATA **store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type);
+
+ bool low_byte_first() const
+ {
+#ifdef WORDS_BIGENDIAN
+ return false;
+#else
+ return true;
+#endif
+ }
+ bool has_transactions() { return true;}
+
+ const char* index_type(uint key_number) {
+ switch (get_index_type(key_number)) {
+ case ORDERED_INDEX:
+ return "BTREE";
+ case UNIQUE_INDEX:
+ case PRIMARY_KEY_INDEX:
+ default:
+ return "HASH";
+ }
+ }
+
+ double scan_time();
+ ha_rows records_in_range(int inx,
+ const byte *start_key,uint start_key_len,
+ enum ha_rkey_function start_search_flag,
+ const byte *end_key,uint end_key_len,
+ enum ha_rkey_function end_search_flag);
+
+
+ static Ndb* seize_ndb();
+ static void release_ndb(Ndb* ndb);
+
+
+ private:
+ int alter_table_name(const char *from, const char *to);
+ int drop_table();
+ int create_index(const char *name, KEY *key_info);
+ int initialize_autoincrement(const void* table);
+ int get_metadata(const char* path);
+ void release_metadata();
+ const char* get_index_name(uint idx_no) const;
+ NDB_INDEX_TYPE get_index_type(uint idx_no) const;
+ NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
+
+ int pk_read(const byte *key, uint key_len,
+ byte *buf);
+ int unique_index_read(const byte *key, uint key_len,
+ byte *buf);
+ int ordered_index_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag);
+ int full_table_scan(byte * buf);
+ int next_result(byte *buf);
+#if 0
+ int filtered_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag);
+#endif
+
+ void unpack_record(byte *buf);
+
+ void set_dbname(const char *pathname);
+ void set_tabname(const char *pathname);
+ void set_tabname(const char *pathname, char *tabname);
+
+ bool set_hidden_key(NdbOperation*,
+ uint fieldnr, const byte* field_ptr);
+ int set_ndb_key(NdbOperation*, Field *field,
+ uint fieldnr, const byte* field_ptr);
+ int set_ndb_value(NdbOperation*, Field *field, uint fieldnr);
+ int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
+ int set_primary_key(NdbOperation *op, const byte *key);
+ int set_primary_key(NdbOperation *op);
+ int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
+ void print_results();
+
+ longlong get_auto_increment();
+
+ int ndb_err(NdbConnection*);
+
+ private:
+ int check_ndb_connection();
+
+ NdbConnection *m_active_trans;
+ NdbResultSet *m_active_cursor;
+ Ndb *m_ndb;
+ void *m_table;
+ char m_dbname[FN_HEADLEN];
+ //char m_schemaname[FN_HEADLEN];
+ char m_tabname[FN_HEADLEN];
+ ulong m_table_flags;
+ THR_LOCK_DATA m_lock;
+ NDB_SHARE *m_share;
+ NDB_INDEX_TYPE m_indextype[MAX_KEY];
+ NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
+ bool m_use_write;
+};
+
+bool ndbcluster_init(void);
+bool ndbcluster_end(void);
+
+int ndbcluster_commit(THD *thd, void* ndb_transaction);
+int ndbcluster_rollback(THD *thd, void* ndb_transaction);
+
+void ndbcluster_close_connection(THD *thd);
+
+int ndbcluster_discover(const char* dbname, const char* name,
+ const void** frmblob, uint* frmlen);
+int ndbcluster_drop_database(const char* path);
+
+
+
+
+
+
+
+
diff --git a/sql/handler.cc b/sql/handler.cc
index 2b22563bc3c..e1ee3315f40 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -37,6 +37,9 @@
#else
#define innobase_query_caching_of_table_permitted(X,Y,Z) 1
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+#include "ha_ndbcluster.h"
+#endif
#include <myisampack.h>
#include <errno.h>
@@ -48,7 +51,7 @@ ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count,
ha_read_key_count, ha_read_next_count, ha_read_prev_count,
ha_read_first_count, ha_read_last_count,
ha_commit_count, ha_rollback_count,
- ha_read_rnd_count, ha_read_rnd_next_count;
+ ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count;
static SHOW_COMP_OPTION have_yes= SHOW_OPTION_YES;
@@ -76,6 +79,10 @@ struct show_table_type_st sys_table_types[]=
"Supports transactions and page-level locking", DB_TYPE_BERKELEY_DB},
{"BERKELEYDB",&have_berkeley_db,
"Alias for BDB", DB_TYPE_BERKELEY_DB},
+ {"NDBCLUSTER", &have_ndbcluster,
+ "Clustered, fault tolerant memory based tables", DB_TYPE_NDBCLUSTER},
+ {"NDB", &have_ndbcluster,
+ "Alias for NDBCLUSTER", DB_TYPE_NDBCLUSTER},
{NullS, NULL, NullS, DB_TYPE_UNKNOWN}
};
@@ -172,6 +179,10 @@ handler *get_new_handler(TABLE *table, enum db_type db_type)
case DB_TYPE_INNODB:
return new ha_innobase(table);
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ case DB_TYPE_NDBCLUSTER:
+ return new ha_ndbcluster(table);
+#endif
case DB_TYPE_HEAP:
return new ha_heap(table);
default: // should never happen
@@ -216,6 +227,18 @@ int ha_init()
opt_using_transactions=1;
}
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ {
+ if (ndbcluster_init())
+ {
+ have_ndbcluster= SHOW_OPTION_DISABLED;
+ error= 1;
+ }
+ else
+ opt_using_transactions=1;
+ }
+#endif
return error;
}
@@ -243,6 +266,10 @@ int ha_panic(enum ha_panic_function flag)
if (have_innodb == SHOW_OPTION_YES)
error|=innobase_end();
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ error|=ndbcluster_end();
+#endif
return error;
} /* ha_panic */
@@ -252,6 +279,10 @@ void ha_drop_database(char* path)
if (have_innodb == SHOW_OPTION_YES)
innobase_drop_database(path);
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ ndbcluster_drop_database(path);
+#endif
}
void ha_close_connection(THD* thd)
@@ -260,6 +291,10 @@ void ha_close_connection(THD* thd)
if (have_innodb == SHOW_OPTION_YES)
innobase_close_connection(thd);
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ ndbcluster_close_connection(thd);
+#endif
}
/*
@@ -419,6 +454,19 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans)
WRITE_CACHE, (my_off_t) 0, 0, 1);
thd->transaction.trans_log.end_of_file= max_binlog_cache_size;
}
+#ifdef HAVE_NDBCLUSTER_DB
+ if (trans->ndb_tid)
+ {
+ if ((error=ndbcluster_commit(thd,trans->ndb_tid)))
+ {
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), error);
+ error=1;
+ }
+ if (trans == &thd->transaction.all)
+ operation_done= transaction_commited= 1;
+ trans->ndb_tid=0;
+ }
+#endif
#ifdef HAVE_BERKELEY_DB
if (trans->bdb_tid)
{
@@ -472,6 +520,18 @@ int ha_rollback_trans(THD *thd, THD_TRANS *trans)
if (opt_using_transactions)
{
bool operation_done=0;
+#ifdef HAVE_NDBCLUSTER_DB
+ if (trans->ndb_tid)
+ {
+ if ((error=ndbcluster_rollback(thd, trans->ndb_tid)))
+ {
+ my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), error);
+ error=1;
+ }
+ trans->ndb_tid = 0;
+ operation_done=1;
+ }
+#endif
#ifdef HAVE_BERKELEY_DB
if (trans->bdb_tid)
{
@@ -1151,8 +1211,10 @@ bool handler::caching_allowed(THD* thd, char* table_key,
** Some general functions that isn't in the handler class
****************************************************************************/
- /* Initiates table-file and calls apropriate database-creator */
- /* Returns 1 if something got wrong */
+/*
+ Initiates table-file and calls apropriate database-creator
+ Returns 1 if something got wrong
+*/
int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
bool update_create_info)
@@ -1168,7 +1230,7 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
{
update_create_info_from_table(create_info, &table);
if (table.file->table_flags() & HA_DROP_BEFORE_CREATE)
- table.file->delete_table(name); // Needed for BDB tables
+ table.file->delete_table(name);
}
if (lower_case_table_names == 2 &&
!(table.file->table_flags() & HA_FILE_BASED))
@@ -1290,6 +1352,26 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
/*
+ Try to discover one table from handler(s)
+*/
+
+int ha_discover(const char* dbname, const char* name,
+ const void** frmblob, uint* frmlen)
+{
+ int error= 1; // Table does not exist in any handler
+ DBUG_ENTER("ha_discover");
+ DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ error= ndbcluster_discover(dbname, name, frmblob, frmlen);
+#endif
+ if (!error)
+ statistic_increment(ha_discover_count,&LOCK_status);
+ DBUG_RETURN(error);
+}
+
+
+/*
Read first row between two ranges.
Store ranges for future calls to read_range_next
@@ -1425,3 +1507,5 @@ int handler::compare_key(key_range *range)
}
return key_compare_result_on_equal;
}
+
+
diff --git a/sql/handler.h b/sql/handler.h
index 4cb6ab86a37..a5d4f78ef29 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -28,7 +28,8 @@
#define NO_HASH /* Not yet implemented */
#endif
-#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB)
+#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB) || \
+ defined(HAVE_NDBCLUSTER_DB)
#define USING_TRANSACTIONS
#endif
@@ -80,7 +81,6 @@
#define HA_FILE_BASED (1 << 26)
-
/* bits in index_flags(index_number) for what you can do with index */
#define HA_WRONG_ASCII_ORDER 1 /* Can't use sorting through key */
#define HA_READ_NEXT 2 /* Read next record with same key */
@@ -141,12 +141,17 @@
#define HA_CACHE_TBL_ASKTRANSACT 1
#define HA_CACHE_TBL_TRANSACT 2
-enum db_type { DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1,
- DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM,
- DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM,
- DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM,
- DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB, DB_TYPE_GEMINI,
- DB_TYPE_DEFAULT };
+enum db_type
+{
+ DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1,
+ DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM,
+ DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM,
+ DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM,
+ DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB,
+ DB_TYPE_GEMINI, DB_TYPE_NDBCLUSTER,
+
+ DB_TYPE_DEFAULT // Must be last
+};
struct show_table_type_st {
const char *type;
@@ -176,6 +181,7 @@ typedef struct st_thd_trans {
void *bdb_tid;
void *innobase_tid;
bool innodb_active_trans;
+ void *ndb_tid;
} THD_TRANS;
enum enum_tx_isolation { ISO_READ_UNCOMMITTED, ISO_READ_COMMITTED,
@@ -479,3 +485,5 @@ bool ha_flush_logs(void);
int ha_recovery_logging(THD *thd, bool on);
int ha_change_key_cache(KEY_CACHE *old_key_cache,
KEY_CACHE *new_key_cache);
+int ha_discover(const char* dbname, const char* name,
+ const void** frmblob, uint* frmlen);
diff --git a/sql/lex.h b/sql/lex.h
index 94ea0295f05..e5bc537c213 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -48,7 +48,7 @@ SYM_GROUP sym_group_rtree= {"RTree keys", "HAVE_RTREE_KEYS"};
*/
static SYMBOL symbols[] = {
- { "&&", SYM(AND)},
+ { "&&", SYM(AND_SYM)},
{ "<", SYM(LT)},
{ "<=", SYM(LE)},
{ "<>", SYM(NE)},
@@ -67,7 +67,7 @@ static SYMBOL symbols[] = {
{ "ALL", SYM(ALL)},
{ "ALTER", SYM(ALTER)},
{ "ANALYZE", SYM(ANALYZE_SYM)},
- { "AND", SYM(AND)},
+ { "AND", SYM(AND_SYM)},
{ "ANY", SYM(ANY_SYM)},
{ "AS", SYM(AS)},
{ "ASC", SYM(ASC)},
@@ -295,6 +295,8 @@ static SYMBOL symbols[] = {
{ "NAMES", SYM(NAMES_SYM)},
{ "NATIONAL", SYM(NATIONAL_SYM)},
{ "NATURAL", SYM(NATURAL)},
+ { "NDB", SYM(NDBCLUSTER_SYM)},
+ { "NDBCLUSTER", SYM(NDBCLUSTER_SYM)},
{ "NCHAR", SYM(NCHAR_SYM)},
{ "NEW", SYM(NEW_SYM)},
{ "NEXT", SYM(NEXT_SYM)},
@@ -312,7 +314,7 @@ static SYMBOL symbols[] = {
{ "OPTIMIZE", SYM(OPTIMIZE)},
{ "OPTION", SYM(OPTION)},
{ "OPTIONALLY", SYM(OPTIONALLY)},
- { "OR", SYM(OR)},
+ { "OR", SYM(OR_SYM)},
{ "ORDER", SYM(ORDER_SYM)},
{ "OUTER", SYM(OUTER)},
{ "OUTFILE", SYM(OUTFILE)},
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 1175b93b9ba..9e7db79a7b2 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -837,7 +837,7 @@ extern ulong server_id, concurrency;
extern ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count;
extern ulong ha_read_key_count, ha_read_next_count, ha_read_prev_count;
extern ulong ha_read_first_count, ha_read_last_count;
-extern ulong ha_read_rnd_count, ha_read_rnd_next_count;
+extern ulong ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count;
extern ulong ha_commit_count, ha_rollback_count,table_cache_size;
extern ulong max_connections,max_connect_errors, connect_timeout;
extern ulong slave_net_timeout;
@@ -891,6 +891,7 @@ extern SHOW_VAR init_vars[],status_vars[], internal_vars[];
extern SHOW_COMP_OPTION have_isam;
extern SHOW_COMP_OPTION have_innodb;
extern SHOW_COMP_OPTION have_berkeley_db;
+extern SHOW_COMP_OPTION have_ndbcluster;
extern struct system_variables global_system_variables;
extern struct system_variables max_system_variables;
extern struct rand_struct sql_rand;
@@ -960,6 +961,10 @@ int format_number(uint inputflag,uint max_length,my_string pos,uint length,
my_string *errpos);
int openfrm(const char *name,const char *alias,uint filestat,uint prgflag,
uint ha_open_flags, TABLE *outparam);
+int readfrm(const char *name, const void** data, uint* length);
+int writefrm(const char* name, const void* data, uint len);
+int create_table_from_handler(const char *db, const char *name,
+ bool create_if_found);
int closefrm(TABLE *table);
db_type get_table_type(const char *name);
int read_string(File file, gptr *to, uint length);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 5176ee33a17..2e0e8493aad 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -32,6 +32,9 @@
#ifdef HAVE_ISAM
#include "ha_isam.h"
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+#include "ha_ndbcluster.h"
+#endif
#include <nisam.h>
#include <thr_alarm.h>
#include <ft_global.h>
@@ -261,7 +264,7 @@ my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol;
my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
-my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam;
+my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm;
my_bool opt_secure_auth= 0;
@@ -370,7 +373,7 @@ KEY_CACHE *sql_key_cache;
CHARSET_INFO *system_charset_info, *files_charset_info ;
CHARSET_INFO *national_charset_info, *table_alias_charset;
-SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam;
+SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam, have_ndbcluster;
SHOW_COMP_OPTION have_raid, have_openssl, have_symlink, have_query_cache;
SHOW_COMP_OPTION have_crypt, have_compress;
@@ -3625,7 +3628,7 @@ enum options_mysqld
OPT_INNODB_FAST_SHUTDOWN,
OPT_INNODB_FILE_PER_TABLE,
OPT_SAFE_SHOW_DB,
- OPT_INNODB, OPT_ISAM, OPT_SKIP_SAFEMALLOC,
+ OPT_INNODB, OPT_ISAM, OPT_NDBCLUSTER, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
@@ -4158,6 +4161,13 @@ Disable with --skip-innodb (will save memory).",
Disable with --skip-isam.",
(gptr*) &opt_isam, (gptr*) &opt_isam, 0, GET_BOOL, NO_ARG, 1, 0, 0,
0, 0, 0},
+#ifdef HAVE_NDBCLUSTER_DB
+ {"ndbcluster", OPT_NDBCLUSTER, "Enable NDB Cluster (if this version of MySQL
+supports it). \
+Disable with --skip-ndbcluster (will save memory).",
+ (gptr*) &opt_ndbcluster, (gptr*) &opt_ndbcluster, 0, GET_BOOL, NO_ARG, 1, 0, 0,
+ 0, 0, 0},
+#endif
{"skip-locking", OPT_SKIP_LOCK,
"Deprecated option, use --skip-external-locking instead.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
@@ -4828,6 +4838,7 @@ struct show_var_st status_vars[]= {
{"Handler_rollback", (char*) &ha_rollback_count, SHOW_LONG},
{"Handler_update", (char*) &ha_update_count, SHOW_LONG},
{"Handler_write", (char*) &ha_write_count, SHOW_LONG},
+ {"Handler_discover", (char*) &ha_discover_count, SHOW_LONG},
{"Key_blocks_not_flushed", (char*) &dflt_key_cache_var.global_blocks_changed,
SHOW_KEY_CACHE_LONG},
{"Key_blocks_used", (char*) &dflt_key_cache_var.global_blocks_used,
@@ -5121,6 +5132,11 @@ static void mysql_init_variables(void)
#else
have_isam=SHOW_OPTION_NO;
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ have_ndbcluster=SHOW_OPTION_YES;
+#else
+ have_ndbcluster=SHOW_OPTION_NO;
+#endif
#ifdef USE_RAID
have_raid=SHOW_OPTION_YES;
#else
@@ -5591,6 +5607,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
have_isam= SHOW_OPTION_DISABLED;
#endif
break;
+ case OPT_NDBCLUSTER:
+#ifdef HAVE_NDBCLUSTER_DB
+ if (opt_ndbcluster)
+ have_ndbcluster=SHOW_OPTION_YES;
+ else
+ have_ndbcluster=SHOW_OPTION_DISABLED;
+#endif
+ break;
case OPT_INNODB:
#ifdef HAVE_INNOBASE_DB
if (opt_innodb)
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 5ccda5c7052..0cf4d2d2691 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -59,6 +59,9 @@
#ifdef HAVE_INNOBASE_DB
#include "ha_innodb.h"
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+#include "ha_ndbcluster.h"
+#endif
static HASH system_variable_hash;
const char *bool_type_names[]= { "OFF", "ON", NullS };
@@ -638,6 +641,7 @@ struct show_var_st init_vars[]= {
{"have_crypt", (char*) &have_crypt, SHOW_HAVE},
{"have_innodb", (char*) &have_innodb, SHOW_HAVE},
{"have_isam", (char*) &have_isam, SHOW_HAVE},
+ {"have_ndbcluster", (char*) &have_ndbcluster, SHOW_HAVE},
{"have_openssl", (char*) &have_openssl, SHOW_HAVE},
{"have_query_cache", (char*) &have_query_cache, SHOW_HAVE},
{"have_raid", (char*) &have_raid, SHOW_HAVE},
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 8c4571dd540..d24b13ff96f 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -1317,18 +1317,34 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db,
{
char path[FN_REFLEN];
int error;
+ uint discover_retry_count= 0;
DBUG_ENTER("open_unireg_entry");
strxmov(path, mysql_data_home, "/", db, "/", name, NullS);
- if (openfrm(path,alias,
+ while (openfrm(path,alias,
(uint) (HA_OPEN_KEYFILE | HA_OPEN_RNDFILE | HA_GET_INDEX |
HA_TRY_READ_ONLY),
READ_KEYINFO | COMPUTE_TYPES | EXTRA_RECORD,
thd->open_options, entry))
{
if (!entry->crashed)
- goto err; // Can't repair the table
+ {
+ /*
+ Frm file could not be found on disk
+ Since it does not exist, no one can be using it
+ LOCK_open has been locked to protect from someone else
+ trying to discover the table at the same time.
+ */
+ if (discover_retry_count++ != 0)
+ goto err;
+ if (create_table_from_handler(db, name, true) != 0)
+ goto err;
+
+ thd->clear_error(); // Clear error message
+ continue;
+ }
+ // Code below is for repairing a crashed file
TABLE_LIST table_list;
bzero((char*) &table_list, sizeof(table_list)); // just for safe
table_list.db=(char*) db;
@@ -1374,6 +1390,7 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db,
if (error)
goto err;
+ break;
}
/*
If we are here, there was no fatal error (but error may be still
diff --git a/sql/sql_class.h b/sql/sql_class.h
index aa526d5e474..d5eb7a9fd0e 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -688,7 +688,10 @@ public:
THD_TRANS all; // Trans since BEGIN WORK
THD_TRANS stmt; // Trans for current statement
uint bdb_lock_count;
-
+ uint ndb_lock_count;
+#ifdef HAVE_NDBCLUSTER_DB
+ void* ndb;
+#endif
/*
Tables changed in transaction (that must be invalidated in query cache).
List contain only transactional tables, that not invalidated in query
@@ -878,7 +881,8 @@ public:
{
#ifdef USING_TRANSACTIONS
return (transaction.all.bdb_tid != 0 ||
- transaction.all.innodb_active_trans != 0);
+ transaction.all.innodb_active_trans != 0 ||
+ transaction.all.ndb_tid != 0);
#else
return 0;
#endif
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index d5f77bf545d..c46a9823a52 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -1148,6 +1148,35 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name,
}
}
+ /*
+ Check that table with given name does not already
+ exist in any storage engine. In such a case it should
+ be discovered and the error ER_TABLE_EXISTS_ERROR be returned
+ unless user specified CREATE TABLE IF EXISTS
+ The LOCK_open mutex has been locked to make sure no
+ one else is attempting to discover the table. Since
+ it's not on disk as a frm file, no one could be using it!
+ */
+ if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+ {
+ bool create_if_not_exists =
+ create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
+ if (!create_table_from_handler(db, table_name,
+ create_if_not_exists))
+ {
+ DBUG_PRINT("info", ("Table already existed in handler"));
+
+ if (create_if_not_exists)
+ {
+ create_info->table_existed= 1; // Mark that table existed
+ error= 0;
+ }
+ else
+ my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
+ goto end;
+ }
+ }
+
thd->proc_info="creating table";
create_info->table_existed= 0; // Mark that table is created
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index b87b0b29677..568a526fd58 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -181,7 +181,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token ACTION
%token AGGREGATE_SYM
%token ALL
-%token AND
+%token AND_SYM
%token AS
%token ASC
%token AUTO_INC
@@ -305,6 +305,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token NAMES_SYM
%token NATIONAL_SYM
%token NATURAL
+%token NDBCLUSTER_SYM
%token NEW_SYM
%token NCHAR_SYM
%token NCHAR_STRING
@@ -318,7 +319,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token OPEN_SYM
%token OPTION
%token OPTIONALLY
-%token OR
+%token OR_SYM
%token OR_OR_CONCAT
%token ORDER_SYM
%token OUTER
@@ -574,8 +575,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token BEFORE_SYM
%left SET_VAR
-%left OR_OR_CONCAT OR XOR
-%left AND
+%left OR_OR_CONCAT OR_SYM XOR
+%left AND_SYM
%left BETWEEN_SYM CASE_SYM WHEN_SYM THEN_SYM ELSE
%left EQ EQUAL_SYM GE GT_SYM LE LT NE IS LIKE REGEXP IN_SYM
%left '|'
@@ -727,7 +728,7 @@ END_OF_INPUT
%type <NONE>
'-' '+' '*' '/' '%' '(' ')'
- ',' '!' '{' '}' '&' '|' AND OR OR_OR_CONCAT BETWEEN_SYM CASE_SYM
+ ',' '!' '{' '}' '&' '|' AND_SYM OR_SYM OR_OR_CONCAT BETWEEN_SYM CASE_SYM
THEN_SYM WHEN_SYM DIV_SYM MOD_SYM
%%
@@ -2421,14 +2422,14 @@ expr_expr:
{
$$= new Item_func_not(new Item_in_subselect($1, $4));
}
- | expr BETWEEN_SYM no_and_expr AND expr
+ | expr BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_between($1,$3,$5); }
- | expr NOT BETWEEN_SYM no_and_expr AND expr
+ | expr NOT BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_not(new Item_func_between($1,$4,$6)); }
| expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); }
- | expr OR expr { $$= new Item_cond_or($1,$3); }
+ | expr OR_SYM expr { $$= new Item_cond_or($1,$3); }
| expr XOR expr { $$= new Item_cond_xor($1,$3); }
- | expr AND expr { $$= new Item_cond_and($1,$3); }
+ | expr AND_SYM expr { $$= new Item_cond_and($1,$3); }
| expr SOUNDS_SYM LIKE expr
{
$$= new Item_func_eq(new Item_func_soundex($1),
@@ -2469,14 +2470,14 @@ expr_expr:
/* expressions that begin with 'expr' that do NOT follow IN_SYM */
no_in_expr:
- no_in_expr BETWEEN_SYM no_and_expr AND expr
+ no_in_expr BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_between($1,$3,$5); }
- | no_in_expr NOT BETWEEN_SYM no_and_expr AND expr
+ | no_in_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_not(new Item_func_between($1,$4,$6)); }
| no_in_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); }
- | no_in_expr OR expr { $$= new Item_cond_or($1,$3); }
+ | no_in_expr OR_SYM expr { $$= new Item_cond_or($1,$3); }
| no_in_expr XOR expr { $$= new Item_cond_xor($1,$3); }
- | no_in_expr AND expr { $$= new Item_cond_and($1,$3); }
+ | no_in_expr AND_SYM expr { $$= new Item_cond_and($1,$3); }
| no_in_expr SOUNDS_SYM LIKE expr
{
$$= new Item_func_eq(new Item_func_soundex($1),
@@ -2527,12 +2528,12 @@ no_and_expr:
{
$$= new Item_func_not(new Item_in_subselect($1, $4));
}
- | no_and_expr BETWEEN_SYM no_and_expr AND expr
+ | no_and_expr BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_between($1,$3,$5); }
- | no_and_expr NOT BETWEEN_SYM no_and_expr AND expr
+ | no_and_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_not(new Item_func_between($1,$4,$6)); }
| no_and_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); }
- | no_and_expr OR expr { $$= new Item_cond_or($1,$3); }
+ | no_and_expr OR_SYM expr { $$= new Item_cond_or($1,$3); }
| no_and_expr XOR expr { $$= new Item_cond_xor($1,$3); }
| no_and_expr SOUNDS_SYM LIKE expr
{
@@ -4147,8 +4148,8 @@ show_param:
YYABORT;
}
| NEW_SYM MASTER_SYM FOR_SYM SLAVE WITH MASTER_LOG_FILE_SYM EQ
- TEXT_STRING_sys AND MASTER_LOG_POS_SYM EQ ulonglong_num
- AND MASTER_SERVER_ID_SYM EQ
+ TEXT_STRING_sys AND_SYM MASTER_LOG_POS_SYM EQ ulonglong_num
+ AND_SYM MASTER_SERVER_ID_SYM EQ
ULONG_NUM
{
Lex->sql_command = SQLCOM_SHOW_NEW_MASTER;
@@ -4983,6 +4984,7 @@ keyword:
| NAMES_SYM {}
| NATIONAL_SYM {}
| NCHAR_SYM {}
+ | NDBCLUSTER_SYM {}
| NEXT_SYM {}
| NEW_SYM {}
| NO_SYM {}
@@ -5434,7 +5436,7 @@ grant_privilege:
opt_and:
/* empty */ {}
- | AND {}
+ | AND_SYM {}
;
require_list:
diff --git a/sql/table.cc b/sql/table.cc
index 1b7d30560ef..281a8c10409 100644
--- a/sql/table.cc
+++ b/sql/table.cc
@@ -944,7 +944,8 @@ static void frm_error(int error, TABLE *form, const char *name, myf errortype)
break;
case 2:
{
- datext=form->file ? *form->file->bas_ext() : "";
+ datext= form->file ? *form->file->bas_ext() : "";
+ datext= datext==NullS ? "" : datext;
err_no= (my_errno == ENOENT) ? ER_FILE_NOT_FOUND : (my_errno == EAGAIN) ?
ER_FILE_USED : ER_CANT_OPEN_FILE;
my_error(err_no,errortype,