summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc1196
1 files changed, 864 insertions, 332 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 5b36d6d2b55..09cb0c0f02d 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -41,7 +41,13 @@ static const int parallelism= 240;
// Default value for max number of transactions
// createable against NDB from this handler
-static const int max_transactions = 256;
+static const int max_transactions= 256;
+
+// Default value for prefetch of autoincrement values
+static const ha_rows autoincrement_prefetch= 32;
+
+// connectstring to cluster if given by mysqld
+const char *ndbcluster_connectstring= 0;
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
@@ -138,6 +144,7 @@ static int ndb_to_mysql_error(const NdbError *err)
int ha_ndbcluster::ndb_err(NdbConnection *trans)
{
+ int res;
const NdbError err= trans->getNdbError();
if (!err.code)
return 0; // Don't log things to DBUG log if no error
@@ -155,7 +162,13 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
default:
break;
}
- DBUG_RETURN(ndb_to_mysql_error(&err));
+ res= ndb_to_mysql_error(&err);
+ DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d",
+ err.code, res));
+ if (res == HA_ERR_FOUND_DUPP_KEY)
+ dupkey= table->primary_key;
+
+ DBUG_RETURN(res);
}
@@ -182,6 +195,45 @@ bool ha_ndbcluster::get_error_message(int error,
/*
+ Check if type is supported by NDB.
+ TODO Use this once, not in every operation
+*/
+
+static inline bool ndb_supported_type(enum_field_types type)
+{
+ switch (type) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ return true;
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ break;
+ }
+ return false;
+}
+
+
+/*
Instruct NDB to set the value of the hidden primary key
*/
@@ -208,40 +260,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
pack_len));
DBUG_DUMP("key", (char*)field_ptr, pack_len);
- switch (field->type()) {
- case MYSQL_TYPE_DECIMAL:
- case MYSQL_TYPE_TINY:
- case MYSQL_TYPE_SHORT:
- case MYSQL_TYPE_LONG:
- case MYSQL_TYPE_FLOAT:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_TIMESTAMP:
- case MYSQL_TYPE_LONGLONG:
- case MYSQL_TYPE_INT24:
- case MYSQL_TYPE_DATE:
- case MYSQL_TYPE_TIME:
- case MYSQL_TYPE_DATETIME:
- case MYSQL_TYPE_YEAR:
- case MYSQL_TYPE_NEWDATE:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_STRING:
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
-
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_NULL:
- case MYSQL_TYPE_GEOMETRY:
- default:
- // Unhandled field types
- DBUG_PRINT("error", ("Field type %d not supported", field->type()));
- DBUG_RETURN(2);
+ if (ndb_supported_type(field->type()))
+ {
+ if (! (field->flags & BLOB_FLAG))
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
}
- DBUG_RETURN(3);
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
}
@@ -259,63 +286,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
fieldnr, field->field_name, field->type(),
pack_len, field->is_null()?"Y":"N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
-
- if (field->is_null())
+
+ if (ndb_supported_type(field->type()))
{
- // Set value to NULL
- DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
- }
-
- switch (field->type()) {
- case MYSQL_TYPE_DECIMAL:
- case MYSQL_TYPE_TINY:
- case MYSQL_TYPE_SHORT:
- case MYSQL_TYPE_LONG:
- case MYSQL_TYPE_FLOAT:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_TIMESTAMP:
- case MYSQL_TYPE_LONGLONG:
- case MYSQL_TYPE_INT24:
- case MYSQL_TYPE_DATE:
- case MYSQL_TYPE_TIME:
- case MYSQL_TYPE_DATETIME:
- case MYSQL_TYPE_YEAR:
- case MYSQL_TYPE_NEWDATE:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_STRING:
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
-
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_NULL:
- case MYSQL_TYPE_GEOMETRY:
- default:
- // Unhandled field types
- DBUG_PRINT("error", ("Field type %d not supported", field->type()));
- DBUG_RETURN(2);
+ if (! (field->flags & BLOB_FLAG))
+ {
+ if (field->is_null())
+ // Set value to NULL
+ DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
+ }
+
+ // Blob type
+ NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
+ if (ndb_blob != NULL)
+ {
+ if (field->is_null())
+ DBUG_RETURN(ndb_blob->setNull() != 0);
+
+ Field_blob *field_blob= (Field_blob*)field;
+
+ // Get length and pointer to data
+ uint32 blob_len= field_blob->get_length(field_ptr);
+ char* blob_ptr= NULL;
+ field_blob->get_ptr(&blob_ptr);
+
+ // Looks like NULL blob can also be signaled in this way
+ if (blob_ptr == NULL)
+ DBUG_RETURN(ndb_blob->setNull() != 0);
+
+ DBUG_PRINT("value", ("set blob ptr=%x len=%u",
+ (unsigned)blob_ptr, blob_len));
+ DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
+
+ // No callback needed to write value
+ DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
+ }
+ DBUG_RETURN(1);
}
- DBUG_RETURN(3);
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+}
+
+
+/*
+ Callback to read all blob values.
+ - not done in unpack_record because unpack_record is valid
+ after execute(Commit) but reading blobs is not
+ - may only generate read operations; they have to be executed
+ somewhere before the data is available
+ - due to single buffer for all blobs, we let the last blob
+ process all blobs (last so that all are active)
+ - null bit is still set in unpack_record
+ - TODO allocate blob part aligned buffers
+*/
+
+NdbBlob::ActiveHook g_get_ndb_blobs_value;
+
+int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
+{
+ DBUG_ENTER("g_get_ndb_blobs_value");
+ if (ndb_blob->blobsNextBlob() != NULL)
+ DBUG_RETURN(0);
+ ha_ndbcluster *ha= (ha_ndbcluster *)arg;
+ DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
+}
+
+int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
+{
+ DBUG_ENTER("get_ndb_blobs_value");
+
+ // Field has no field number so cannot use TABLE blob_field
+ // Loop twice, first only counting total buffer size
+ for (int loop= 0; loop <= 1; loop++)
+ {
+ uint32 offset= 0;
+ for (uint i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ NdbValue value= m_value[i];
+ if (value.ptr != NULL && (field->flags & BLOB_FLAG))
+ {
+ Field_blob *field_blob= (Field_blob *)field;
+ NdbBlob *ndb_blob= value.blob;
+ Uint64 blob_len= 0;
+ if (ndb_blob->getLength(blob_len) != 0)
+ DBUG_RETURN(-1);
+ // Align to Uint64
+ uint32 blob_size= blob_len;
+ if (blob_size % 8 != 0)
+ blob_size+= 8 - blob_size % 8;
+ if (loop == 1)
+ {
+ char *buf= blobs_buffer + offset;
+ uint32 len= 0xffffffff; // Max uint32
+ DBUG_PRINT("value", ("read blob ptr=%x len=%u",
+ (uint)buf, (uint)blob_len));
+ if (ndb_blob->readData(buf, len) != 0)
+ DBUG_RETURN(-1);
+ DBUG_ASSERT(len == blob_len);
+ field_blob->set_ptr(len, buf);
+ }
+ offset+= blob_size;
+ }
+ }
+ if (loop == 0 && offset > blobs_buffer_size)
+ {
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ blobs_buffer_size= 0;
+ DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
+ blobs_buffer= my_malloc(offset, MYF(MY_WME));
+ if (blobs_buffer == NULL)
+ DBUG_RETURN(-1);
+ blobs_buffer_size= offset;
+ }
+ }
+ DBUG_RETURN(0);
}
/*
Instruct NDB to fetch one field
- - data is read directly into buffer provided by field_ptr
- if it's NULL, data is read into memory provided by NDBAPI
+ - data is read directly into buffer provided by field
+ if field is NULL, data is read into memory provided by NDBAPI
*/
-int ha_ndbcluster::get_ndb_value(NdbOperation *op,
- uint field_no, byte *field_ptr)
+int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
+ uint fieldnr)
{
DBUG_ENTER("get_ndb_value");
- DBUG_PRINT("enter", ("field_no: %d", field_no));
- m_value[field_no]= op->getValue(field_no, field_ptr);
- DBUG_RETURN(m_value == NULL);
+ DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
+ (int)(field != NULL ? field->flags : 0)));
+
+ if (field != NULL)
+ {
+ if (ndb_supported_type(field->type()))
+ {
+ DBUG_ASSERT(field->ptr != NULL);
+ if (! (field->flags & BLOB_FLAG))
+ {
+ m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr);
+ DBUG_RETURN(m_value[fieldnr].rec == NULL);
+ }
+
+ // Blob type
+ NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
+ m_value[fieldnr].blob= ndb_blob;
+ if (ndb_blob != NULL)
+ {
+ // Set callback
+ void *arg= (void *)this;
+ DBUG_RETURN(ndb_blob->setActiveHook(g_get_ndb_blobs_value, arg) != 0);
+ }
+ DBUG_RETURN(1);
+ }
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+
+ // Used for hidden key only
+ m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
+ DBUG_RETURN(m_value[fieldnr].rec == NULL);
+}
+
+
+/*
+ Check if any set or get of blob value in current query.
+*/
+bool ha_ndbcluster::uses_blob_value(bool all_fields)
+{
+ if (table->blob_fields == 0)
+ return false;
+ if (all_fields)
+ return true;
+ {
+ uint no_fields= table->fields;
+ int i;
+ THD *thd= current_thd;
+ // They always put blobs at the end..
+ for (i= no_fields - 1; i >= 0; i--)
+ {
+ Field *field= table->field[i];
+ if (thd->query_id == field->query_id)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
}
@@ -391,41 +552,95 @@ int ha_ndbcluster::get_metadata(const char *path)
// All checks OK, lets use the table
m_table= (void*)tab;
- DBUG_RETURN(build_index_list());
+ DBUG_RETURN(build_index_list(table, ILBP_OPEN));
}
-int ha_ndbcluster::build_index_list()
+
+int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
{
+ int error= 0;
char *name;
const char *index_name;
static const char* unique_suffix= "$unique";
uint i, name_len;
+ KEY* key_info= tab->key_info;
+ const char **key_name= tab->keynames.type_names;
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
DBUG_ENTER("build_index_list");
// Save information about all known indexes
- for (uint i= 0; i < table->keys; i++)
+ for (i= 0; i < tab->keys; i++, key_info++, key_name++)
{
+ index_name= *key_name;
NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
- m_indextype[i]= idx_type;
-
+ m_index[i].type= idx_type;
if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{
- index_name= get_index_name(i);
name_len= strlen(index_name)+strlen(unique_suffix)+1;
// Create name for unique index by appending "$unique";
if (!(name= my_malloc(name_len, MYF(MY_WME))))
DBUG_RETURN(2);
strxnmov(name, name_len, index_name, unique_suffix, NullS);
- m_unique_index_name[i]= name;
+ m_index[i].unique_name= name;
DBUG_PRINT("info", ("Created unique index name: %s for index %d",
name, i));
}
+ // Create secondary indexes if in create phase
+ if (phase == ILBP_CREATE)
+ {
+ DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
+
+ switch (m_index[i].type){
+
+ case PRIMARY_KEY_INDEX:
+ // Do nothing, already created
+ break;
+ case PRIMARY_KEY_ORDERED_INDEX:
+ error= create_ordered_index(index_name, key_info);
+ break;
+ case UNIQUE_ORDERED_INDEX:
+ if (!(error= create_ordered_index(index_name, key_info)))
+ error= create_unique_index(get_unique_index_name(i), key_info);
+ break;
+ case UNIQUE_INDEX:
+ error= create_unique_index(get_unique_index_name(i), key_info);
+ break;
+ case ORDERED_INDEX:
+ error= create_ordered_index(index_name, key_info);
+ break;
+ default:
+ DBUG_ASSERT(false);
+ break;
+ }
+ if (error)
+ {
+ DBUG_PRINT("error", ("Failed to create index %u", i));
+ drop_table();
+ break;
+ }
+ }
+ // Add handles to index objects
+ DBUG_PRINT("info", ("Trying to add handle to index %s", index_name));
+ if ((m_index[i].type != PRIMARY_KEY_INDEX) &&
+ (m_index[i].type != UNIQUE_INDEX))
+ {
+ const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
+ if (!index) DBUG_RETURN(1);
+ m_index[i].index= (void *) index;
+ }
+ if (m_index[i].unique_name)
+ {
+ const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname);
+ if (!index) DBUG_RETURN(1);
+ m_index[i].unique_index= (void *) index;
+ }
+ DBUG_PRINT("info", ("Added handle to index %s", index_name));
}
- DBUG_RETURN(0);
+
+ DBUG_RETURN(error);
}
-
/*
Decode the type of an index from information
provided in table object
@@ -454,18 +669,29 @@ void ha_ndbcluster::release_metadata()
// Release index list
for (i= 0; i < MAX_KEY; i++)
{
- if (m_unique_index_name[i])
- my_free((char*)m_unique_index_name[i], MYF(0));
- m_unique_index_name[i]= NULL;
+ if (m_index[i].unique_name)
+ my_free((char*)m_index[i].unique_name, MYF(0));
+ m_index[i].unique_name= NULL;
+ m_index[i].unique_index= NULL;
+ m_index[i].index= NULL;
}
DBUG_VOID_RETURN;
}
-NdbCursorOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
+int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
- return (type == TL_WRITE_ALLOW_WRITE) ?
- NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read;
+ int lm;
+ if (type == TL_WRITE_ALLOW_WRITE)
+ lm= NdbScanOperation::LM_Exclusive;
+ else if (uses_blob_value(retrieve_all_fields))
+ /*
+ TODO use a new scan mode to read + lock + keyinfo
+ */
+ lm= NdbScanOperation::LM_Exclusive;
+ else
+ lm= NdbScanOperation::LM_CommittedRead;
+ return lm;
}
static const ulong index_type_flags[]=
@@ -507,13 +733,13 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
{
- return m_unique_index_name[idx_no];
+ return m_index[idx_no].unique_name;
}
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
- return m_indextype[idx_no];
+ return m_index[idx_no].type;
}
@@ -593,7 +819,7 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op)
Read one record from NDB using primary key
*/
-int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
+int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
{
uint no_fields= table->fields, i;
NdbConnection *trans= m_active_trans;
@@ -603,8 +829,9 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
- if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
- goto err;
+ if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
+ op->readTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
{
@@ -612,10 +839,11 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("info", ("Using hidden key"));
DBUG_DUMP("key", (char*)key, 8);
if (set_hidden_key(op, no_fields, key))
- goto err;
+ ERR_RETURN(trans->getNdbError());
+
// Read key at the same time, for future reference
- if (get_ndb_value(op, no_fields, NULL))
- goto err;
+ if (get_ndb_value(op, NULL, no_fields))
+ ERR_RETURN(trans->getNdbError());
}
else
{
@@ -624,19 +852,20 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
return res;
}
- // Read non-key field(s)
+ // Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
- if (thd->query_id == field->query_id)
+ if ((thd->query_id == field->query_id) ||
+ retrieve_all_fields)
{
- if (get_ndb_value(op, i, field->ptr))
- goto err;
+ if (get_ndb_value(op, field, i))
+ ERR_RETURN(trans->getNdbError());
}
else
{
// Attribute was not to be read
- m_value[i]= NULL;
+ m_value[i].ptr= NULL;
}
}
@@ -650,9 +879,55 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
+}
+
+
+/*
+ Read one complementing record from NDB using primary key from old_data
+*/
+
+int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
+{
+ uint no_fields= table->fields, i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ THD *thd= current_thd;
+ DBUG_ENTER("complemented_pk_read");
+
+ if (retrieve_all_fields)
+ // We have allready retrieved all fields, nothing to complement
+ DBUG_RETURN(0);
+
+ if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
+ op->readTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ int res;
+ if ((res= set_primary_key_from_old_data(op, old_data)))
+ ERR_RETURN(trans->getNdbError());
+
+ // Read all unreferenced non-key field(s)
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if (!(field->flags & PRI_KEY_FLAG) &&
+ (thd->query_id != field->query_id))
+ {
+ if (get_ndb_value(op, field, i))
+ ERR_RETURN(trans->getNdbError());
+ }
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
- err:
- ERR_RETURN(trans->getNdbError());
+ // The value have now been fetched from NDB
+ unpack_record(new_data);
+ table->status= 0;
+ DBUG_RETURN(0);
}
@@ -675,8 +950,9 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
- if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index),
- m_tabname)) ||
+ if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
+ m_index[active_index].unique_index,
+ (NDBTAB *) m_table)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError());
@@ -700,13 +976,13 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
- if (get_ndb_value(op, i, field->ptr))
+ if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
// Attribute was not to be read
- m_value[i]= NULL;
+ m_value[i].ptr= NULL;
}
}
@@ -746,14 +1022,25 @@ inline int ha_ndbcluster::next_result(byte *buf)
If this an update or delete, call nextResult with false
to process any records already cached in NdbApi
*/
- bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
+ bool contact_ndb= m_lock.type != TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
+ /*
+ We can only handle one tuple with blobs at a time.
+ */
+ if (ops_pending && blobs_pending)
+ {
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ ops_pending= 0;
+ blobs_pending= false;
+ }
check= cursor->nextResult(contact_ndb);
if (check == 0)
{
// One more record found
DBUG_PRINT("info", ("One more record found"));
+
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
@@ -791,15 +1078,17 @@ inline int ha_ndbcluster::next_result(byte *buf)
Set bounds for a ordered index scan, use key_range
*/
-int ha_ndbcluster::set_bounds(NdbOperation *op,
+int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
const key_range *key,
int bound)
{
- uint i, tot_len;
+ uint key_len, key_store_len, tot_len, key_tot_len;
byte *key_ptr;
KEY* key_info= table->key_info + active_index;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
+ Field* field;
+ bool key_nullable, key_null;
DBUG_ENTER("set_bounds");
DBUG_PRINT("enter", ("bound: %d", bound));
@@ -809,29 +1098,37 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
// Set bounds using key data
tot_len= 0;
- key_ptr= (byte *) key->key;
+ key_ptr= (byte *) key->key;
+ key_tot_len= key->length;
for (; key_part != end; key_part++)
{
- Field* field= key_part->field;
- uint32 field_len= field->pack_length();
- tot_len+= field_len;
+ field= key_part->field;
+ key_len= key_part->length;
+ key_store_len= key_part->store_length;
+ key_nullable= (bool) key_part->null_bit;
+ key_null= (field->maybe_null() && *key_ptr);
+ tot_len+= key_store_len;
const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"};
DBUG_ASSERT(bound >= 0 && bound <= 4);
- DBUG_PRINT("info", ("Set Bound%s on %s",
+ DBUG_PRINT("info", ("Set Bound%s on %s %s %s %s",
bounds[bound],
- field->field_name));
- DBUG_DUMP("key", (char*)key_ptr, field_len);
-
+ field->field_name,
+ key_nullable ? "NULLABLE" : "",
+ key_null ? "NULL":""));
+ DBUG_PRINT("info", ("Total length %ds", tot_len));
+
+ DBUG_DUMP("key", (char*) key_ptr, key_store_len);
+
if (op->setBound(field->field_name,
bound,
- key_ptr,
- field_len) != 0)
+ key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr),
+ key_null ? 0 : key_len) != 0)
ERR_RETURN(op->getNdbError());
- key_ptr+= field_len;
-
- if (tot_len >= key->length)
+ key_ptr+= key_store_len;
+
+ if (tot_len >= key_tot_len)
break;
/*
@@ -839,7 +1136,7 @@ int ha_ndbcluster::set_bounds(NdbOperation *op,
so if this bound was not EQ, bail out and make
a best effort attempt
*/
- if (bound != NdbOperation::BoundEQ)
+ if (bound != NdbIndexScanOperation::BoundEQ)
break;
}
@@ -857,7 +1154,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
{
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
- NdbScanOperation *op;
+ NdbIndexScanOperation *op;
const char *index_name;
DBUG_ENTER("ordered_index_scan");
@@ -865,19 +1162,24 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
index_name= get_index_name(active_index);
- if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
+ if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
+ m_index[active_index].index,
+ (NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
+
+ NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
+ get_ndb_lock_type(m_lock.type);
+ if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
if (start_key &&
set_bounds(op, start_key,
(start_key->flag == HA_READ_KEY_EXACT) ?
- NdbOperation::BoundEQ :
+ NdbIndexScanOperation::BoundEQ :
(start_key->flag == HA_READ_AFTER_KEY) ?
- NdbOperation::BoundLT :
- NdbOperation::BoundLE))
+ NdbIndexScanOperation::BoundLT :
+ NdbIndexScanOperation::BoundLE))
DBUG_RETURN(1);
if (end_key)
@@ -888,8 +1190,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
}
else if (set_bounds(op, end_key,
(end_key->flag == HA_READ_AFTER_KEY) ?
- NdbOperation::BoundGE :
- NdbOperation::BoundGT))
+ NdbIndexScanOperation::BoundGE :
+ NdbIndexScanOperation::BoundGT))
DBUG_RETURN(1);
}
DBUG_RETURN(define_read_attrs(buf, op));
@@ -925,12 +1227,14 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname));
- if (!(op= trans->getNdbScanOperation(m_tabname)))
+ if (!(op= trans->getNdbScanOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
+ NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
+ get_ndb_lock_type(m_lock.type);
+ if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
-
+
{
// Start scan filter
NdbScanFilter sf(op);
@@ -994,9 +1298,11 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
- if (!(op=trans->getNdbScanOperation(m_tabname)))
+ if (!(op=trans->getNdbScanOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
+ NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
+ get_ndb_lock_type(m_lock.type);
+ if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
@@ -1020,12 +1326,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
(field->flags & PRI_KEY_FLAG) ||
retrieve_all_fields)
{
- if (get_ndb_value(op, i, field->ptr))
+ if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
- m_value[i]= NULL;
+ m_value[i].ptr= NULL;
}
}
@@ -1039,7 +1345,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
- if (get_ndb_value(op, hidden_no, NULL))
+ if (get_ndb_value(op, NULL, hidden_no))
ERR_RETURN(op->getNdbError());
}
@@ -1056,6 +1362,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
int ha_ndbcluster::write_row(byte *record)
{
+ bool has_auto_increment;
uint i;
NdbConnection *trans= m_active_trans;
NdbOperation *op;
@@ -1065,10 +1372,10 @@ int ha_ndbcluster::write_row(byte *record)
statistic_increment(ha_write_count,&LOCK_status);
if (table->timestamp_default_now)
update_timestamp(record+table->timestamp_default_now-1);
- if (table->next_number_field && record == table->record[0])
- update_auto_increment();
+ has_auto_increment= (table->next_number_field && record == table->record[0]);
+ skip_auto_increment= table->auto_increment_field_not_null;
- if (!(op= trans->getNdbOperation(m_tabname)))
+ if (!(op= trans->getNdbOperation((NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
res= (m_use_write) ? op->writeTuple() :op->insertTuple();
@@ -1078,13 +1385,17 @@ int ha_ndbcluster::write_row(byte *record)
if (table->primary_key == MAX_KEY)
{
// Table has hidden primary key
- Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ Uint64 auto_value= m_ndb->getAutoIncrementValue((NDBTAB *) m_table);
if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError());
}
else
{
int res;
+
+ if ((has_auto_increment) && (!skip_auto_increment))
+ update_auto_increment();
+
if ((res= set_primary_key(op)))
return res;
}
@@ -1095,7 +1406,10 @@ int ha_ndbcluster::write_row(byte *record)
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
set_ndb_value(op, field, i))
+ {
+ skip_auto_increment= true;
ERR_RETURN(op->getNdbError());
+ }
}
/*
@@ -1106,16 +1420,34 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted++;
- if ((rows_inserted == rows_to_insert) ||
- ((rows_inserted % bulk_insert_rows) == 0))
+ bulk_insert_not_flushed= true;
+ if ((rows_to_insert == 1) ||
+ ((rows_inserted % bulk_insert_rows) == 0) ||
+ uses_blob_value(false) != 0)
{
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
- rows_inserted, bulk_insert_rows));
+ (int)rows_inserted, (int)bulk_insert_rows));
+ bulk_insert_not_flushed= false;
if (trans->execute(NoCommit) != 0)
+ {
+ skip_auto_increment= true;
DBUG_RETURN(ndb_err(trans));
+ }
}
+ if ((has_auto_increment) && (skip_auto_increment))
+ {
+ Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
+ DBUG_PRINT("info",
+ ("Trying to set next auto increment value to %lu",
+ (ulong) next_val));
+ if (m_ndb->setAutoIncrementValue((NDBTAB *) m_table, next_val, true))
+ DBUG_PRINT("info",
+ ("Setting next auto increment value to %u", next_val));
+ }
+ skip_auto_increment= true;
+
DBUG_RETURN(0);
}
@@ -1171,11 +1503,40 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (table->timestamp_on_update_now)
update_timestamp(new_data+table->timestamp_on_update_now-1);
- /* Check for update of primary key and return error */
+ /* Check for update of primary key for special handling */
if ((table->primary_key != MAX_KEY) &&
(key_cmp(table->primary_key, old_data, new_data)))
- DBUG_RETURN(HA_ERR_UNSUPPORTED);
-
+ {
+ int read_res, insert_res, delete_res;
+
+ DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete"));
+ // Get all old fields, since we optimize away fields not in query
+ read_res= complemented_pk_read(old_data, new_data);
+ if (read_res)
+ {
+ DBUG_PRINT("info", ("pk read failed"));
+ DBUG_RETURN(read_res);
+ }
+ // Insert new row
+ insert_res= write_row(new_data);
+ if (insert_res)
+ {
+ DBUG_PRINT("info", ("insert failed"));
+ DBUG_RETURN(insert_res);
+ }
+ // Delete old row
+ DBUG_PRINT("info", ("insert succeded"));
+ delete_res= delete_row(old_data);
+ if (delete_res)
+ {
+ DBUG_PRINT("info", ("delete failed"));
+ // Undo write_row(new_data)
+ DBUG_RETURN(delete_row(new_data));
+ }
+ DBUG_PRINT("info", ("insert+delete succeeded"));
+ DBUG_RETURN(0);
+ }
+
if (cursor)
{
/*
@@ -1189,10 +1550,12 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
ops_pending++;
+ if (uses_blob_value(false))
+ blobs_pending= true;
}
else
{
- if (!(op= trans->getNdbOperation(m_tabname)) ||
+ if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
op->updateTuple() != 0)
ERR_RETURN(trans->getNdbError());
@@ -1204,7 +1567,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
+ NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
@@ -1253,9 +1616,9 @@ int ha_ndbcluster::delete_row(const byte *record)
if (cursor)
{
/*
- We are scanning records and want to update the record
+ We are scanning records and want to delete the record
that was just found, call deleteTuple on the cursor
- to take over the lock to a new update operation
+ to take over the lock to a new delete operation
And thus setting the primary key of the record from
the active record in cursor
*/
@@ -1270,7 +1633,7 @@ int ha_ndbcluster::delete_row(const byte *record)
else
{
- if (!(op=trans->getNdbOperation(m_tabname)) ||
+ if (!(op=trans->getNdbOperation((NDBTAB *) m_table)) ||
op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
@@ -1279,7 +1642,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
+ NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef()))
@@ -1317,7 +1680,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
{
uint row_offset= (uint) (buf - table->record[0]);
Field **field, **end;
- NdbRecAttr **value= m_value;
+ NdbValue *value= m_value;
DBUG_ENTER("unpack_record");
// Set null flag(s)
@@ -1326,8 +1689,23 @@ void ha_ndbcluster::unpack_record(byte* buf)
field < end;
field++, value++)
{
- if (*value && (*value)->isNULL())
- (*field)->set_null(row_offset);
+ if ((*value).ptr)
+ {
+ if (! ((*field)->flags & BLOB_FLAG))
+ {
+ if ((*value).rec->isNULL())
+ (*field)->set_null(row_offset);
+ }
+ else
+ {
+ NdbBlob* ndb_blob= (*value).blob;
+ bool isNull= true;
+ int ret= ndb_blob->getNull(isNull);
+ DBUG_ASSERT(ret == 0);
+ if (isNull)
+ (*field)->set_null(row_offset);
+ }
+ }
}
#ifndef DBUG_OFF
@@ -1338,7 +1716,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
- NdbRecAttr* rec= m_value[hidden_no];
+ NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value()));
@@ -1348,7 +1726,6 @@ void ha_ndbcluster::unpack_record(byte* buf)
DBUG_VOID_RETURN;
}
-
/*
Utility function to print/dump the fetched field
*/
@@ -1366,9 +1743,9 @@ void ha_ndbcluster::print_results()
{
Field *field;
const NDBCOL *col;
- NdbRecAttr *value;
+ NdbValue value;
- if (!(value= m_value[f]))
+ if (!(value= m_value[f]).ptr)
{
fprintf(DBUG_FILE, "Field %d was not read\n", f);
continue;
@@ -1377,19 +1754,28 @@ void ha_ndbcluster::print_results()
DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
col= tab->getColumn(f);
fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
-
- if (value->isNULL())
+
+ NdbBlob *ndb_blob= NULL;
+ if (! (field->flags & BLOB_FLAG))
{
- fprintf(DBUG_FILE, "NULL\n");
- continue;
+ if (value.rec->isNULL())
+ {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
+ }
+ else
+ {
+ ndb_blob= value.blob;
+ bool isNull= true;
+ ndb_blob->getNull(isNull);
+ if (isNull) {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
}
switch (col->getType()) {
- case NdbDictionary::Column::Blob:
- case NdbDictionary::Column::Clob:
- case NdbDictionary::Column::Undefined:
- fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
- break;
case NdbDictionary::Column::Tinyint: {
char value= *field->ptr;
fprintf(DBUG_FILE, "Tinyint\t%d", value);
@@ -1481,6 +1867,21 @@ void ha_ndbcluster::print_results()
fprintf(DBUG_FILE, "Timespec\t%llu", value);
break;
}
+ case NdbDictionary::Column::Blob: {
+ Uint64 len= 0;
+ ndb_blob->getLength(len);
+ fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len);
+ break;
+ }
+ case NdbDictionary::Column::Text: {
+ Uint64 len= 0;
+ ndb_blob->getLength(len);
+ fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len);
+ break;
+ }
+ case NdbDictionary::Column::Undefined:
+ fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
+ break;
}
fprintf(DBUG_FILE, "\n");
@@ -1537,7 +1938,7 @@ int ha_ndbcluster::index_next(byte *buf)
{
DBUG_ENTER("index_next");
- int error = 1;
+ int error= 1;
statistic_increment(ha_read_next_count,&LOCK_status);
DBUG_RETURN(next_result(buf));
}
@@ -1628,9 +2029,13 @@ int ha_ndbcluster::rnd_init(bool scan)
NdbResultSet *cursor= m_active_cursor;
DBUG_ENTER("rnd_init");
DBUG_PRINT("enter", ("scan: %d", scan));
- // Check that cursor is not defined
+ // Check if scan is to be restarted
if (cursor)
- DBUG_RETURN(1);
+ {
+ if (!scan)
+ DBUG_RETURN(1);
+ cursor->restart();
+ }
index_init(table->primary_key);
DBUG_RETURN(0);
}
@@ -1638,11 +2043,25 @@ int ha_ndbcluster::rnd_init(bool scan)
int ha_ndbcluster::close_scan()
{
NdbResultSet *cursor= m_active_cursor;
+ NdbConnection *trans= m_active_trans;
DBUG_ENTER("close_scan");
if (!cursor)
DBUG_RETURN(1);
+
+ if (ops_pending)
+ {
+ /*
+ Take over any pending transactions to the
+ deleteing/updating transaction before closing the scan
+ */
+ DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ ops_pending= 0;
+ }
+
cursor->close();
m_active_cursor= NULL;
DBUG_RETURN(0);
@@ -1724,7 +2143,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields;
- NdbRecAttr* rec= m_value[hidden_no];
+ NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() &&
@@ -1755,7 +2174,10 @@ void ha_ndbcluster::info(uint flag)
if (flag & HA_STATUS_VARIABLE)
DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
if (flag & HA_STATUS_ERRKEY)
+ {
DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
+ errkey= dupkey;
+ }
if (flag & HA_STATUS_AUTO)
DBUG_PRINT("info", ("HA_STATUS_AUTO"));
DBUG_VOID_RETURN;
@@ -1875,6 +2297,8 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
break;
case HA_EXTRA_CHANGE_KEY_TO_DUP:
DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
+ case HA_EXTRA_KEYREAD_PRESERVE_FIELDS:
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_PRESERVE_FIELDS"));
break;
}
@@ -1898,7 +2322,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
const NDBTAB *tab= (NDBTAB *) m_table;
DBUG_ENTER("start_bulk_insert");
- DBUG_PRINT("enter", ("rows: %d", rows));
+ DBUG_PRINT("enter", ("rows: %d", (int)rows));
rows_inserted= 0;
rows_to_insert= rows;
@@ -1910,7 +2334,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
degrade if too many bytes are inserted, thus it's limited by this
calculation.
*/
- const int bytesperbatch = 8192;
+ const int bytesperbatch= 8192;
bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
batch= bytesperbatch/bytes;
batch= batch == 0 ? 1 : batch;
@@ -1925,15 +2349,32 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
*/
int ha_ndbcluster::end_bulk_insert()
{
+ int error= 0;
+
DBUG_ENTER("end_bulk_insert");
- DBUG_RETURN(0);
+ // Check if last inserts need to be flushed
+ if (bulk_insert_not_flushed)
+ {
+ NdbConnection *trans= m_active_trans;
+ // Send rows to NDB
+ DBUG_PRINT("info", ("Sending inserts to NDB, "\
+ "rows_inserted:%d, bulk_insert_rows: %d",
+ rows_inserted, bulk_insert_rows));
+ bulk_insert_not_flushed= false;
+ if (trans->execute(NoCommit) != 0)
+ error= ndb_err(trans);
+ }
+
+ rows_inserted= 0;
+ rows_to_insert= 1;
+ DBUG_RETURN(error);
}
int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{
DBUG_ENTER("extra_opt");
- DBUG_PRINT("enter", ("cache_size: %d", cache_size));
+ DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
DBUG_RETURN(extra(operation));
}
@@ -1947,7 +2388,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const
-{ static const char *ext[1] = { NullS }; return ext; }
+{ static const char *ext[1]= { NullS }; return ext; }
/*
@@ -2154,7 +2595,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
NdbConnection *tablock_trans=
(NdbConnection*)thd->transaction.all.ndb_tid;
- DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
+ DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
@@ -2189,8 +2630,11 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
if (trans->execute(Commit) != 0)
{
const NdbError err= trans->getNdbError();
+ const NdbOperation *error_op= trans->getNdbErrorOperation();
ERR_PRINT(err);
res= ndb_to_mysql_error(&err);
+ if (res != -1)
+ ndbcluster_print_error(res, error_op);
}
ndb->closeTransaction(trans);
DBUG_RETURN(res);
@@ -2216,8 +2660,11 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction)
if (trans->execute(Rollback) != 0)
{
const NdbError err= trans->getNdbError();
+ const NdbOperation *error_op= trans->getNdbErrorOperation();
ERR_PRINT(err);
res= ndb_to_mysql_error(&err);
+ if (res != -1)
+ ndbcluster_print_error(res, error_op);
}
ndb->closeTransaction(trans);
DBUG_RETURN(0);
@@ -2225,71 +2672,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction)
/*
- Map MySQL type to the corresponding NDB type
+ Define NDB column based on Field.
+ Returns 0 or mysql error code.
+ Not member of ha_ndbcluster because NDBCOL cannot be declared.
*/
-inline NdbDictionary::Column::Type
-mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
+static int create_ndb_column(NDBCOL &col,
+ Field *field,
+ HA_CREATE_INFO *info)
{
- switch(mysql_type) {
+ // Set name
+ col.setName(field->field_name);
+ // Set type and sizes
+ const enum enum_field_types mysql_type= field->real_type();
+ switch (mysql_type) {
+ // Numeric types
case MYSQL_TYPE_DECIMAL:
- return NdbDictionary::Column::Char;
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
case MYSQL_TYPE_TINY:
- return (unsigned_flg) ?
- NdbDictionary::Column::Tinyunsigned :
- NdbDictionary::Column::Tinyint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Tinyunsigned);
+ else
+ col.setType(NDBCOL::Tinyint);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_SHORT:
- return (unsigned_flg) ?
- NdbDictionary::Column::Smallunsigned :
- NdbDictionary::Column::Smallint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Smallunsigned);
+ else
+ col.setType(NDBCOL::Smallint);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_LONG:
- return (unsigned_flg) ?
- NdbDictionary::Column::Unsigned :
- NdbDictionary::Column::Int;
- case MYSQL_TYPE_TIMESTAMP:
- return NdbDictionary::Column::Unsigned;
- case MYSQL_TYPE_LONGLONG:
- return (unsigned_flg) ?
- NdbDictionary::Column::Bigunsigned :
- NdbDictionary::Column::Bigint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Unsigned);
+ else
+ col.setType(NDBCOL::Int);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_INT24:
- return (unsigned_flg) ?
- NdbDictionary::Column::Mediumunsigned :
- NdbDictionary::Column::Mediumint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Mediumunsigned);
+ else
+ col.setType(NDBCOL::Mediumint);
+ col.setLength(1);
+ break;
+ case MYSQL_TYPE_LONGLONG:
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Bigunsigned);
+ else
+ col.setType(NDBCOL::Bigint);
+ col.setLength(1);
break;
case MYSQL_TYPE_FLOAT:
- return NdbDictionary::Column::Float;
+ col.setType(NDBCOL::Float);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_DOUBLE:
- return NdbDictionary::Column::Double;
- case MYSQL_TYPE_DATETIME :
- return NdbDictionary::Column::Datetime;
- case MYSQL_TYPE_DATE :
- case MYSQL_TYPE_NEWDATE :
- case MYSQL_TYPE_TIME :
- case MYSQL_TYPE_YEAR :
- // Missing NDB data types, mapped to char
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_ENUM :
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_SET :
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_TINY_BLOB :
- case MYSQL_TYPE_MEDIUM_BLOB :
- case MYSQL_TYPE_LONG_BLOB :
- case MYSQL_TYPE_BLOB :
- return NdbDictionary::Column::Blob;
- case MYSQL_TYPE_VAR_STRING :
- return NdbDictionary::Column::Varchar;
- case MYSQL_TYPE_STRING :
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_NULL :
- case MYSQL_TYPE_GEOMETRY :
- return NdbDictionary::Column::Undefined;
- }
- return NdbDictionary::Column::Undefined;
+ col.setType(NDBCOL::Double);
+ col.setLength(1);
+ break;
+ // Date types
+ case MYSQL_TYPE_TIMESTAMP:
+ col.setType(NDBCOL::Unsigned);
+ col.setLength(1);
+ break;
+ case MYSQL_TYPE_DATETIME:
+ col.setType(NDBCOL::Datetime);
+ col.setLength(1);
+ break;
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_YEAR:
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ // Char types
+ case MYSQL_TYPE_STRING:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Binary);
+ else
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ case MYSQL_TYPE_VAR_STRING:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Varbinary);
+ else
+ col.setType(NDBCOL::Varchar);
+ col.setLength(field->pack_length());
+ break;
+ // Blob types (all come in as MYSQL_TYPE_BLOB)
+ mysql_type_tiny_blob:
+ case MYSQL_TYPE_TINY_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ col.setInlineSize(256);
+ // No parts
+ col.setPartSize(0);
+ col.setStripeSize(0);
+ break;
+ mysql_type_blob:
+ case MYSQL_TYPE_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ // Use "<=" even if "<" is the exact condition
+ if (field->max_length() <= (1 << 8))
+ goto mysql_type_tiny_blob;
+ else if (field->max_length() <= (1 << 16))
+ {
+ col.setInlineSize(256);
+ col.setPartSize(2000);
+ col.setStripeSize(16);
+ }
+ else if (field->max_length() <= (1 << 24))
+ goto mysql_type_medium_blob;
+ else
+ goto mysql_type_long_blob;
+ break;
+ mysql_type_medium_blob:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ col.setInlineSize(256);
+ col.setPartSize(4000);
+ col.setStripeSize(8);
+ break;
+ mysql_type_long_blob:
+ case MYSQL_TYPE_LONG_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ col.setInlineSize(256);
+ col.setPartSize(8000);
+ col.setStripeSize(4);
+ break;
+ // Other types
+ case MYSQL_TYPE_ENUM:
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ case MYSQL_TYPE_SET:
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ goto mysql_type_unsupported;
+ mysql_type_unsupported:
+ default:
+ return HA_ERR_UNSUPPORTED;
+ }
+ // Set nullable and pk
+ col.setNullable(field->maybe_null());
+ col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
+ // Set autoincrement
+ if (field->flags & AUTO_INCREMENT_FLAG)
+ {
+ col.setAutoIncrement(TRUE);
+ ulonglong value= info->auto_increment_value ?
+ info->auto_increment_value -1 : (ulonglong) 0;
+ DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
+ col.setAutoIncrementInitialValue(value);
+ }
+ else
+ col.setAutoIncrement(false);
+ return 0;
}
-
/*
Create a table in NDB Cluster
*/
@@ -2299,7 +2859,6 @@ int ha_ndbcluster::create(const char *name,
HA_CREATE_INFO *info)
{
NDBTAB tab;
- NdbDictionary::Column::Type ndb_type;
NDBCOL col;
uint pack_length, length, i;
const void *data, *pack_data;
@@ -2330,31 +2889,11 @@ int ha_ndbcluster::create(const char *name,
for (i= 0; i < form->fields; i++)
{
Field *field= form->field[i];
- ndb_type= mysql_to_ndb_type(field->real_type(),
- field->flags & UNSIGNED_FLAG);
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(),
field->pack_length()));
- col.setName(field->field_name);
- col.setType(ndb_type);
- if ((ndb_type == NdbDictionary::Column::Char) ||
- (ndb_type == NdbDictionary::Column::Varchar))
- col.setLength(field->pack_length());
- else
- col.setLength(1);
- col.setNullable(field->maybe_null());
- col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
- if (field->flags & AUTO_INCREMENT_FLAG)
- {
- col.setAutoIncrement(TRUE);
- ulonglong value= info->auto_increment_value ?
- info->auto_increment_value -1 : (ulonglong) 0;
- DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
- col.setAutoIncrementInitialValue(value);
- }
- else
- col.setAutoIncrement(false);
-
+ if ((my_errno= create_ndb_column(col, field, info)))
+ DBUG_RETURN(my_errno);
tab.addColumn(col);
}
@@ -2389,50 +2928,10 @@ int ha_ndbcluster::create(const char *name,
}
DBUG_PRINT("info", ("Table %s/%s created successfully",
m_dbname, m_tabname));
-
- if ((my_errno= build_index_list()))
- DBUG_RETURN(my_errno);
-
- // Create secondary indexes
- KEY* key_info= form->key_info;
- const char** key_name= key_names;
- for (i= 0; i < form->keys; i++, key_info++, key_name++)
- {
- int error= 0;
- DBUG_PRINT("info", ("Index %u: %s", i, *key_name));
-
- switch (get_index_type_from_table(i)){
- case PRIMARY_KEY_INDEX:
- // Do nothing, already created
- break;
- case PRIMARY_KEY_ORDERED_INDEX:
- error= create_ordered_index(*key_name, key_info);
- break;
- case UNIQUE_ORDERED_INDEX:
- if (!(error= create_ordered_index(*key_name, key_info)))
- error= create_unique_index(get_unique_index_name(i), key_info);
- break;
- case UNIQUE_INDEX:
- error= create_unique_index(get_unique_index_name(i), key_info);
- break;
- case ORDERED_INDEX:
- error= create_ordered_index(*key_name, key_info);
- break;
- default:
- DBUG_ASSERT(false);
- break;
- }
+ // Create secondary indexes
+ my_errno= build_index_list(form, ILBP_CREATE);
- if (error)
- {
- DBUG_PRINT("error", ("Failed to create index %u", i));
- drop_table();
- my_errno= error;
- break;
- }
- }
-
DBUG_RETURN(my_errno);
}
@@ -2468,6 +2967,7 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name));
+ // NdbDictionary::Index ndb_index(name);
NdbDictionary::Index ndb_index(name);
if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
@@ -2601,10 +3101,17 @@ int ndbcluster_drop_database(const char *path)
longlong ha_ndbcluster::get_auto_increment()
{
- int cache_size = rows_to_insert ? rows_to_insert : 32;
+ DBUG_ENTER("get_auto_increment");
+ DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
+ int cache_size=
+ (rows_to_insert > autoincrement_prefetch) ?
+ rows_to_insert
+ : autoincrement_prefetch;
Uint64 auto_value=
- m_ndb->getAutoIncrementValue(m_tabname, cache_size);
- return (longlong)auto_value;
+ (skip_auto_increment) ?
+ m_ndb->readAutoIncrementValue((NDBTAB *) m_table)
+ : m_ndb->getAutoIncrementValue((NDBTAB *) m_table, cache_size);
+ DBUG_RETURN((longlong)auto_value);
}
@@ -2619,15 +3126,20 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_ndb(NULL),
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
+ HA_NULL_IN_KEY |
HA_NOT_EXACT_COUNT |
- HA_NO_PREFIX_CHAR_KEYS |
- HA_NO_BLOBS),
+ HA_NO_PREFIX_CHAR_KEYS),
m_use_write(false),
retrieve_all_fields(FALSE),
- rows_to_insert(0),
+ rows_to_insert(1),
rows_inserted(0),
bulk_insert_rows(1024),
- ops_pending(0)
+ bulk_insert_not_flushed(false),
+ ops_pending(0),
+ skip_auto_increment(true),
+ blobs_buffer(0),
+ blobs_buffer_size(0),
+ dupkey((uint) -1)
{
int i;
@@ -2643,8 +3155,10 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
for (i= 0; i < MAX_KEY; i++)
{
- m_indextype[i]= UNDEFINED_INDEX;
- m_unique_index_name[i]= NULL;
+ m_index[i].type= UNDEFINED_INDEX;
+ m_index[i].unique_name= NULL;
+ m_index[i].unique_index= NULL;
+ m_index[i].index= NULL;
}
DBUG_VOID_RETURN;
@@ -2660,6 +3174,8 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster");
release_metadata();
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ blobs_buffer= 0;
// Check for open cursor/transaction
DBUG_ASSERT(m_active_cursor == NULL);
@@ -2888,6 +3404,12 @@ int ndb_discover_tables()
bool ndbcluster_init()
{
DBUG_ENTER("ndbcluster_init");
+ // Set connectstring if specified
+ if (ndbcluster_connectstring != 0)
+ {
+ DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring));
+ Ndb::setConnectString(ndbcluster_connectstring);
+ }
// Create a Ndb object to open the connection to NDB
g_ndb= new Ndb("sys");
if (g_ndb->init() != 0)
@@ -2921,6 +3443,7 @@ bool ndbcluster_init()
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
+
delete g_ndb;
g_ndb= NULL;
if (!ndbcluster_inited)
@@ -2934,13 +3457,22 @@ bool ndbcluster_end()
DBUG_RETURN(0);
}
-void ndbcluster_print_error(int error)
+/*
+ Static error print function called from
+ static handler method ndbcluster_commit
+ and ndbcluster_rollback
+*/
+
+void ndbcluster_print_error(int error, const NdbOperation *error_op)
{
DBUG_ENTER("ndbcluster_print_error");
TABLE tab;
- tab.table_name = NULL;
+ const char *tab_name= (error_op) ? error_op->getTableName() : "";
+ tab.table_name= (char *) tab_name;
ha_ndbcluster error_handler(&tab);
+ tab.file= &error_handler;
error_handler.print_error(error, MYF(0));
+ DBUG_VOID_RETURN;
}
/*
@@ -2965,7 +3497,7 @@ void ha_ndbcluster::set_tabname(const char *path_name)
ptr= m_tabname;
while (*ptr != '\0') {
- *ptr = tolower(*ptr);
+ *ptr= tolower(*ptr);
ptr++;
}
#endif
@@ -2981,17 +3513,17 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
char *end, *ptr;
/* Scan name from the end */
- end = strend(path_name)-1;
- ptr = end;
+ end= strend(path_name)-1;
+ ptr= end;
while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
ptr--;
}
- uint name_len = end - ptr;
+ uint name_len= end - ptr;
memcpy(tabname, ptr + 1, end - ptr);
- tabname[name_len] = '\0';
+ tabname[name_len]= '\0';
#ifdef __WIN__
/* Put to lower case */
- ptr = tabname;
+ ptr= tabname;
while (*ptr != '\0') {
*ptr= tolower(*ptr);
@@ -3154,7 +3686,7 @@ static int packfrm(const void *data, uint len,
DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
error= 1;
- org_len = len;
+ org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len))
goto err;
@@ -3174,9 +3706,9 @@ static int packfrm(const void *data, uint len,
// Copy frm data into blob, already in machine independent format
memcpy(blob->data, data, org_len);
- *pack_data = blob;
- *pack_len = blob_len;
- error = 0;
+ *pack_data= blob;
+ *pack_len= blob_len;
+ error= 0;
DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
err:
@@ -3188,7 +3720,7 @@ err:
static int unpackfrm(const void **unpack_data, uint *unpack_len,
const void *pack_data)
{
- const frm_blob_struct *blob = (frm_blob_struct*)pack_data;
+ const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
byte *data;
ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm");
@@ -3204,7 +3736,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
if (ver != 1)
DBUG_RETURN(1);
- if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME))))
+ if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
DBUG_RETURN(2);
memcpy(data, blob->data, complen);
@@ -3214,8 +3746,8 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
DBUG_RETURN(3);
}
- *unpack_data = data;
- *unpack_len = complen;
+ *unpack_data= data;
+ *unpack_len= complen;
DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));