diff options
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r-- | sql/ha_ndbcluster.cc | 1196 |
1 files changed, 864 insertions, 332 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 5b36d6d2b55..09cb0c0f02d 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -41,7 +41,13 @@ static const int parallelism= 240; // Default value for max number of transactions // createable against NDB from this handler -static const int max_transactions = 256; +static const int max_transactions= 256; + +// Default value for prefetch of autoincrement values +static const ha_rows autoincrement_prefetch= 32; + +// connectstring to cluster if given by mysqld +const char *ndbcluster_connectstring= 0; #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 @@ -138,6 +144,7 @@ static int ndb_to_mysql_error(const NdbError *err) int ha_ndbcluster::ndb_err(NdbConnection *trans) { + int res; const NdbError err= trans->getNdbError(); if (!err.code) return 0; // Don't log things to DBUG log if no error @@ -155,7 +162,13 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans) default: break; } - DBUG_RETURN(ndb_to_mysql_error(&err)); + res= ndb_to_mysql_error(&err); + DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d", + err.code, res)); + if (res == HA_ERR_FOUND_DUPP_KEY) + dupkey= table->primary_key; + + DBUG_RETURN(res); } @@ -182,6 +195,45 @@ bool ha_ndbcluster::get_error_message(int error, /* + Check if type is supported by NDB. + TODO Use this once, not in every operation +*/ + +static inline bool ndb_supported_type(enum_field_types type) +{ + switch (type) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + return true; + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + break; + } + return false; +} + + +/* Instruct NDB to set the value of the hidden primary key */ @@ -208,40 +260,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, pack_len)); DBUG_DUMP("key", (char*)field_ptr, pack_len); - switch (field->type()) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_YEAR: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_SET: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - // Common implementation for most field types - DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); - - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_NULL: - case MYSQL_TYPE_GEOMETRY: - default: - // Unhandled field types - DBUG_PRINT("error", ("Field type %d not supported", field->type())); - DBUG_RETURN(2); + if (ndb_supported_type(field->type())) + { + if (! (field->flags & BLOB_FLAG)) + // Common implementation for most field types + DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); } - DBUG_RETURN(3); + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); } @@ -259,63 +286,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, fieldnr, field->field_name, field->type(), pack_len, field->is_null()?"Y":"N")); DBUG_DUMP("value", (char*) field_ptr, pack_len); - - if (field->is_null()) + + if (ndb_supported_type(field->type())) { - // Set value to NULL - DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); - } - - switch (field->type()) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_YEAR: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_SET: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - // Common implementation for most field types - DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); - - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_NULL: - case MYSQL_TYPE_GEOMETRY: - default: - // Unhandled field types - DBUG_PRINT("error", ("Field type %d not supported", field->type())); - DBUG_RETURN(2); + if (! (field->flags & BLOB_FLAG)) + { + if (field->is_null()) + // Set value to NULL + DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); + // Common implementation for most field types + DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); + } + + // Blob type + NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr); + if (ndb_blob != NULL) + { + if (field->is_null()) + DBUG_RETURN(ndb_blob->setNull() != 0); + + Field_blob *field_blob= (Field_blob*)field; + + // Get length and pointer to data + uint32 blob_len= field_blob->get_length(field_ptr); + char* blob_ptr= NULL; + field_blob->get_ptr(&blob_ptr); + + // Looks like NULL blob can also be signaled in this way + if (blob_ptr == NULL) + DBUG_RETURN(ndb_blob->setNull() != 0); + + DBUG_PRINT("value", ("set blob ptr=%x len=%u", + (unsigned)blob_ptr, blob_len)); + DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26)); + + // No callback needed to write value + DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0); + } + DBUG_RETURN(1); } - DBUG_RETURN(3); + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); +} + + +/* + Callback to read all blob values. + - not done in unpack_record because unpack_record is valid + after execute(Commit) but reading blobs is not + - may only generate read operations; they have to be executed + somewhere before the data is available + - due to single buffer for all blobs, we let the last blob + process all blobs (last so that all are active) + - null bit is still set in unpack_record + - TODO allocate blob part aligned buffers +*/ + +NdbBlob::ActiveHook g_get_ndb_blobs_value; + +int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) +{ + DBUG_ENTER("g_get_ndb_blobs_value"); + if (ndb_blob->blobsNextBlob() != NULL) + DBUG_RETURN(0); + ha_ndbcluster *ha= (ha_ndbcluster *)arg; + DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); +} + +int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) +{ + DBUG_ENTER("get_ndb_blobs_value"); + + // Field has no field number so cannot use TABLE blob_field + // Loop twice, first only counting total buffer size + for (int loop= 0; loop <= 1; loop++) + { + uint32 offset= 0; + for (uint i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + NdbValue value= m_value[i]; + if (value.ptr != NULL && (field->flags & BLOB_FLAG)) + { + Field_blob *field_blob= (Field_blob *)field; + NdbBlob *ndb_blob= value.blob; + Uint64 blob_len= 0; + if (ndb_blob->getLength(blob_len) != 0) + DBUG_RETURN(-1); + // Align to Uint64 + uint32 blob_size= blob_len; + if (blob_size % 8 != 0) + blob_size+= 8 - blob_size % 8; + if (loop == 1) + { + char *buf= blobs_buffer + offset; + uint32 len= 0xffffffff; // Max uint32 + DBUG_PRINT("value", ("read blob ptr=%x len=%u", + (uint)buf, (uint)blob_len)); + if (ndb_blob->readData(buf, len) != 0) + DBUG_RETURN(-1); + DBUG_ASSERT(len == blob_len); + field_blob->set_ptr(len, buf); + } + offset+= blob_size; + } + } + if (loop == 0 && offset > blobs_buffer_size) + { + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + blobs_buffer_size= 0; + DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); + blobs_buffer= my_malloc(offset, MYF(MY_WME)); + if (blobs_buffer == NULL) + DBUG_RETURN(-1); + blobs_buffer_size= offset; + } + } + DBUG_RETURN(0); } /* Instruct NDB to fetch one field - - data is read directly into buffer provided by field_ptr - if it's NULL, data is read into memory provided by NDBAPI + - data is read directly into buffer provided by field + if field is NULL, data is read into memory provided by NDBAPI */ -int ha_ndbcluster::get_ndb_value(NdbOperation *op, - uint field_no, byte *field_ptr) +int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field, + uint fieldnr) { DBUG_ENTER("get_ndb_value"); - DBUG_PRINT("enter", ("field_no: %d", field_no)); - m_value[field_no]= op->getValue(field_no, field_ptr); - DBUG_RETURN(m_value == NULL); + DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr, + (int)(field != NULL ? field->flags : 0))); + + if (field != NULL) + { + if (ndb_supported_type(field->type())) + { + DBUG_ASSERT(field->ptr != NULL); + if (! (field->flags & BLOB_FLAG)) + { + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr); + DBUG_RETURN(m_value[fieldnr].rec == NULL); + } + + // Blob type + NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr); + m_value[fieldnr].blob= ndb_blob; + if (ndb_blob != NULL) + { + // Set callback + void *arg= (void *)this; + DBUG_RETURN(ndb_blob->setActiveHook(g_get_ndb_blobs_value, arg) != 0); + } + DBUG_RETURN(1); + } + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + + // Used for hidden key only + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL); + DBUG_RETURN(m_value[fieldnr].rec == NULL); +} + + +/* + Check if any set or get of blob value in current query. +*/ +bool ha_ndbcluster::uses_blob_value(bool all_fields) +{ + if (table->blob_fields == 0) + return false; + if (all_fields) + return true; + { + uint no_fields= table->fields; + int i; + THD *thd= current_thd; + // They always put blobs at the end.. + for (i= no_fields - 1; i >= 0; i--) + { + Field *field= table->field[i]; + if (thd->query_id == field->query_id) + { + return true; + } + } + } + return false; } @@ -391,41 +552,95 @@ int ha_ndbcluster::get_metadata(const char *path) // All checks OK, lets use the table m_table= (void*)tab; - DBUG_RETURN(build_index_list()); + DBUG_RETURN(build_index_list(table, ILBP_OPEN)); } -int ha_ndbcluster::build_index_list() + +int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) { + int error= 0; char *name; const char *index_name; static const char* unique_suffix= "$unique"; uint i, name_len; + KEY* key_info= tab->key_info; + const char **key_name= tab->keynames.type_names; + NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); DBUG_ENTER("build_index_list"); // Save information about all known indexes - for (uint i= 0; i < table->keys; i++) + for (i= 0; i < tab->keys; i++, key_info++, key_name++) { + index_name= *key_name; NDB_INDEX_TYPE idx_type= get_index_type_from_table(i); - m_indextype[i]= idx_type; - + m_index[i].type= idx_type; if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX) { - index_name= get_index_name(i); name_len= strlen(index_name)+strlen(unique_suffix)+1; // Create name for unique index by appending "$unique"; if (!(name= my_malloc(name_len, MYF(MY_WME)))) DBUG_RETURN(2); strxnmov(name, name_len, index_name, unique_suffix, NullS); - m_unique_index_name[i]= name; + m_index[i].unique_name= name; DBUG_PRINT("info", ("Created unique index name: %s for index %d", name, i)); } + // Create secondary indexes if in create phase + if (phase == ILBP_CREATE) + { + DBUG_PRINT("info", ("Creating index %u: %s", i, index_name)); + + switch (m_index[i].type){ + + case PRIMARY_KEY_INDEX: + // Do nothing, already created + break; + case PRIMARY_KEY_ORDERED_INDEX: + error= create_ordered_index(index_name, key_info); + break; + case UNIQUE_ORDERED_INDEX: + if (!(error= create_ordered_index(index_name, key_info))) + error= create_unique_index(get_unique_index_name(i), key_info); + break; + case UNIQUE_INDEX: + error= create_unique_index(get_unique_index_name(i), key_info); + break; + case ORDERED_INDEX: + error= create_ordered_index(index_name, key_info); + break; + default: + DBUG_ASSERT(false); + break; + } + if (error) + { + DBUG_PRINT("error", ("Failed to create index %u", i)); + drop_table(); + break; + } + } + // Add handles to index objects + DBUG_PRINT("info", ("Trying to add handle to index %s", index_name)); + if ((m_index[i].type != PRIMARY_KEY_INDEX) && + (m_index[i].type != UNIQUE_INDEX)) + { + const NDBINDEX *index= dict->getIndex(index_name, m_tabname); + if (!index) DBUG_RETURN(1); + m_index[i].index= (void *) index; + } + if (m_index[i].unique_name) + { + const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname); + if (!index) DBUG_RETURN(1); + m_index[i].unique_index= (void *) index; + } + DBUG_PRINT("info", ("Added handle to index %s", index_name)); } - DBUG_RETURN(0); + + DBUG_RETURN(error); } - /* Decode the type of an index from information provided in table object @@ -454,18 +669,29 @@ void ha_ndbcluster::release_metadata() // Release index list for (i= 0; i < MAX_KEY; i++) { - if (m_unique_index_name[i]) - my_free((char*)m_unique_index_name[i], MYF(0)); - m_unique_index_name[i]= NULL; + if (m_index[i].unique_name) + my_free((char*)m_index[i].unique_name, MYF(0)); + m_index[i].unique_name= NULL; + m_index[i].unique_index= NULL; + m_index[i].index= NULL; } DBUG_VOID_RETURN; } -NdbCursorOperation::LockMode get_ndb_lock_type(enum thr_lock_type type) +int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { - return (type == TL_WRITE_ALLOW_WRITE) ? - NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read; + int lm; + if (type == TL_WRITE_ALLOW_WRITE) + lm= NdbScanOperation::LM_Exclusive; + else if (uses_blob_value(retrieve_all_fields)) + /* + TODO use a new scan mode to read + lock + keyinfo + */ + lm= NdbScanOperation::LM_Exclusive; + else + lm= NdbScanOperation::LM_CommittedRead; + return lm; } static const ulong index_type_flags[]= @@ -507,13 +733,13 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const { - return m_unique_index_name[idx_no]; + return m_index[idx_no].unique_name; } inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const { DBUG_ASSERT(idx_no < MAX_KEY); - return m_indextype[idx_no]; + return m_index[idx_no].type; } @@ -593,7 +819,7 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op) Read one record from NDB using primary key */ -int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) +int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) { uint no_fields= table->fields, i; NdbConnection *trans= m_active_trans; @@ -603,8 +829,9 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) DBUG_PRINT("enter", ("key_len: %u", key_len)); DBUG_DUMP("key", (char*)key, key_len); - if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0) - goto err; + if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) || + op->readTuple() != 0) + ERR_RETURN(trans->getNdbError()); if (table->primary_key == MAX_KEY) { @@ -612,10 +839,11 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) DBUG_PRINT("info", ("Using hidden key")); DBUG_DUMP("key", (char*)key, 8); if (set_hidden_key(op, no_fields, key)) - goto err; + ERR_RETURN(trans->getNdbError()); + // Read key at the same time, for future reference - if (get_ndb_value(op, no_fields, NULL)) - goto err; + if (get_ndb_value(op, NULL, no_fields)) + ERR_RETURN(trans->getNdbError()); } else { @@ -624,19 +852,20 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) return res; } - // Read non-key field(s) + // Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS for (i= 0; i < no_fields; i++) { Field *field= table->field[i]; - if (thd->query_id == field->query_id) + if ((thd->query_id == field->query_id) || + retrieve_all_fields) { - if (get_ndb_value(op, i, field->ptr)) - goto err; + if (get_ndb_value(op, field, i)) + ERR_RETURN(trans->getNdbError()); } else { // Attribute was not to be read - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -650,9 +879,55 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) unpack_record(buf); table->status= 0; DBUG_RETURN(0); +} + + +/* + Read one complementing record from NDB using primary key from old_data +*/ + +int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) +{ + uint no_fields= table->fields, i; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + THD *thd= current_thd; + DBUG_ENTER("complemented_pk_read"); + + if (retrieve_all_fields) + // We have allready retrieved all fields, nothing to complement + DBUG_RETURN(0); + + if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) || + op->readTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + int res; + if ((res= set_primary_key_from_old_data(op, old_data))) + ERR_RETURN(trans->getNdbError()); + + // Read all unreferenced non-key field(s) + for (i= 0; i < no_fields; i++) + { + Field *field= table->field[i]; + if (!(field->flags & PRI_KEY_FLAG) && + (thd->query_id != field->query_id)) + { + if (get_ndb_value(op, field, i)) + ERR_RETURN(trans->getNdbError()); + } + } + + if (trans->execute(NoCommit) != 0) + { + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(ndb_err(trans)); + } - err: - ERR_RETURN(trans->getNdbError()); + // The value have now been fetched from NDB + unpack_record(new_data); + table->status= 0; + DBUG_RETURN(0); } @@ -675,8 +950,9 @@ int ha_ndbcluster::unique_index_read(const byte *key, DBUG_DUMP("key", (char*)key, key_len); DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index))); - if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index), - m_tabname)) || + if (!(op= trans->getNdbIndexOperation((NDBINDEX *) + m_index[active_index].unique_index, + (NDBTAB *) m_table)) || op->readTuple() != 0) ERR_RETURN(trans->getNdbError()); @@ -700,13 +976,13 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((thd->query_id == field->query_id) || (field->flags & PRI_KEY_FLAG)) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } else { // Attribute was not to be read - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -746,14 +1022,25 @@ inline int ha_ndbcluster::next_result(byte *buf) If this an update or delete, call nextResult with false to process any records already cached in NdbApi */ - bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; + bool contact_ndb= m_lock.type != TL_WRITE_ALLOW_WRITE; do { DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); + /* + We can only handle one tuple with blobs at a time. + */ + if (ops_pending && blobs_pending) + { + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + ops_pending= 0; + blobs_pending= false; + } check= cursor->nextResult(contact_ndb); if (check == 0) { // One more record found DBUG_PRINT("info", ("One more record found")); + unpack_record(buf); table->status= 0; DBUG_RETURN(0); @@ -791,15 +1078,17 @@ inline int ha_ndbcluster::next_result(byte *buf) Set bounds for a ordered index scan, use key_range */ -int ha_ndbcluster::set_bounds(NdbOperation *op, +int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, const key_range *key, int bound) { - uint i, tot_len; + uint key_len, key_store_len, tot_len, key_tot_len; byte *key_ptr; KEY* key_info= table->key_info + active_index; KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* end= key_part+key_info->key_parts; + Field* field; + bool key_nullable, key_null; DBUG_ENTER("set_bounds"); DBUG_PRINT("enter", ("bound: %d", bound)); @@ -809,29 +1098,37 @@ int ha_ndbcluster::set_bounds(NdbOperation *op, // Set bounds using key data tot_len= 0; - key_ptr= (byte *) key->key; + key_ptr= (byte *) key->key; + key_tot_len= key->length; for (; key_part != end; key_part++) { - Field* field= key_part->field; - uint32 field_len= field->pack_length(); - tot_len+= field_len; + field= key_part->field; + key_len= key_part->length; + key_store_len= key_part->store_length; + key_nullable= (bool) key_part->null_bit; + key_null= (field->maybe_null() && *key_ptr); + tot_len+= key_store_len; const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"}; DBUG_ASSERT(bound >= 0 && bound <= 4); - DBUG_PRINT("info", ("Set Bound%s on %s", + DBUG_PRINT("info", ("Set Bound%s on %s %s %s %s", bounds[bound], - field->field_name)); - DBUG_DUMP("key", (char*)key_ptr, field_len); - + field->field_name, + key_nullable ? "NULLABLE" : "", + key_null ? "NULL":"")); + DBUG_PRINT("info", ("Total length %ds", tot_len)); + + DBUG_DUMP("key", (char*) key_ptr, key_store_len); + if (op->setBound(field->field_name, bound, - key_ptr, - field_len) != 0) + key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr), + key_null ? 0 : key_len) != 0) ERR_RETURN(op->getNdbError()); - key_ptr+= field_len; - - if (tot_len >= key->length) + key_ptr+= key_store_len; + + if (tot_len >= key_tot_len) break; /* @@ -839,7 +1136,7 @@ int ha_ndbcluster::set_bounds(NdbOperation *op, so if this bound was not EQ, bail out and make a best effort attempt */ - if (bound != NdbOperation::BoundEQ) + if (bound != NdbIndexScanOperation::BoundEQ) break; } @@ -857,7 +1154,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, { NdbConnection *trans= m_active_trans; NdbResultSet *cursor; - NdbScanOperation *op; + NdbIndexScanOperation *op; const char *index_name; DBUG_ENTER("ordered_index_scan"); @@ -865,19 +1162,24 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); index_name= get_index_name(active_index); - if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) + if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) + m_index[active_index].index, + (NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type)))) + + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism, sorted))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; if (start_key && set_bounds(op, start_key, (start_key->flag == HA_READ_KEY_EXACT) ? - NdbOperation::BoundEQ : + NdbIndexScanOperation::BoundEQ : (start_key->flag == HA_READ_AFTER_KEY) ? - NdbOperation::BoundLT : - NdbOperation::BoundLE)) + NdbIndexScanOperation::BoundLT : + NdbIndexScanOperation::BoundLE)) DBUG_RETURN(1); if (end_key) @@ -888,8 +1190,8 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, } else if (set_bounds(op, end_key, (end_key->flag == HA_READ_AFTER_KEY) ? - NdbOperation::BoundGE : - NdbOperation::BoundGT)) + NdbIndexScanOperation::BoundGE : + NdbIndexScanOperation::BoundGT)) DBUG_RETURN(1); } DBUG_RETURN(define_read_attrs(buf, op)); @@ -925,12 +1227,14 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, DBUG_PRINT("info", ("Starting a new filtered scan on %s", m_tabname)); - if (!(op= trans->getNdbScanOperation(m_tabname))) + if (!(op= trans->getNdbScanOperation((NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type)))) + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; - + { // Start scan filter NdbScanFilter sf(op); @@ -994,9 +1298,11 @@ int ha_ndbcluster::full_table_scan(byte *buf) DBUG_ENTER("full_table_scan"); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); - if (!(op=trans->getNdbScanOperation(m_tabname))) + if (!(op=trans->getNdbScanOperation((NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type)))) + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; DBUG_RETURN(define_read_attrs(buf, op)); @@ -1020,12 +1326,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) (field->flags & PRI_KEY_FLAG) || retrieve_all_fields) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } else { - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -1039,7 +1345,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) if (!tab->getColumn(hidden_no)) DBUG_RETURN(1); #endif - if (get_ndb_value(op, hidden_no, NULL)) + if (get_ndb_value(op, NULL, hidden_no)) ERR_RETURN(op->getNdbError()); } @@ -1056,6 +1362,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) int ha_ndbcluster::write_row(byte *record) { + bool has_auto_increment; uint i; NdbConnection *trans= m_active_trans; NdbOperation *op; @@ -1065,10 +1372,10 @@ int ha_ndbcluster::write_row(byte *record) statistic_increment(ha_write_count,&LOCK_status); if (table->timestamp_default_now) update_timestamp(record+table->timestamp_default_now-1); - if (table->next_number_field && record == table->record[0]) - update_auto_increment(); + has_auto_increment= (table->next_number_field && record == table->record[0]); + skip_auto_increment= table->auto_increment_field_not_null; - if (!(op= trans->getNdbOperation(m_tabname))) + if (!(op= trans->getNdbOperation((NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); res= (m_use_write) ? op->writeTuple() :op->insertTuple(); @@ -1078,13 +1385,17 @@ int ha_ndbcluster::write_row(byte *record) if (table->primary_key == MAX_KEY) { // Table has hidden primary key - Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); + Uint64 auto_value= m_ndb->getAutoIncrementValue((NDBTAB *) m_table); if (set_hidden_key(op, table->fields, (const byte*)&auto_value)) ERR_RETURN(op->getNdbError()); } else { int res; + + if ((has_auto_increment) && (!skip_auto_increment)) + update_auto_increment(); + if ((res= set_primary_key(op))) return res; } @@ -1095,7 +1406,10 @@ int ha_ndbcluster::write_row(byte *record) Field *field= table->field[i]; if (!(field->flags & PRI_KEY_FLAG) && set_ndb_value(op, field, i)) + { + skip_auto_increment= true; ERR_RETURN(op->getNdbError()); + } } /* @@ -1106,16 +1420,34 @@ int ha_ndbcluster::write_row(byte *record) Find out how this is detected! */ rows_inserted++; - if ((rows_inserted == rows_to_insert) || - ((rows_inserted % bulk_insert_rows) == 0)) + bulk_insert_not_flushed= true; + if ((rows_to_insert == 1) || + ((rows_inserted % bulk_insert_rows) == 0) || + uses_blob_value(false) != 0) { // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", - rows_inserted, bulk_insert_rows)); + (int)rows_inserted, (int)bulk_insert_rows)); + bulk_insert_not_flushed= false; if (trans->execute(NoCommit) != 0) + { + skip_auto_increment= true; DBUG_RETURN(ndb_err(trans)); + } } + if ((has_auto_increment) && (skip_auto_increment)) + { + Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1; + DBUG_PRINT("info", + ("Trying to set next auto increment value to %lu", + (ulong) next_val)); + if (m_ndb->setAutoIncrementValue((NDBTAB *) m_table, next_val, true)) + DBUG_PRINT("info", + ("Setting next auto increment value to %u", next_val)); + } + skip_auto_increment= true; + DBUG_RETURN(0); } @@ -1171,11 +1503,40 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (table->timestamp_on_update_now) update_timestamp(new_data+table->timestamp_on_update_now-1); - /* Check for update of primary key and return error */ + /* Check for update of primary key for special handling */ if ((table->primary_key != MAX_KEY) && (key_cmp(table->primary_key, old_data, new_data))) - DBUG_RETURN(HA_ERR_UNSUPPORTED); - + { + int read_res, insert_res, delete_res; + + DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete")); + // Get all old fields, since we optimize away fields not in query + read_res= complemented_pk_read(old_data, new_data); + if (read_res) + { + DBUG_PRINT("info", ("pk read failed")); + DBUG_RETURN(read_res); + } + // Insert new row + insert_res= write_row(new_data); + if (insert_res) + { + DBUG_PRINT("info", ("insert failed")); + DBUG_RETURN(insert_res); + } + // Delete old row + DBUG_PRINT("info", ("insert succeded")); + delete_res= delete_row(old_data); + if (delete_res) + { + DBUG_PRINT("info", ("delete failed")); + // Undo write_row(new_data) + DBUG_RETURN(delete_row(new_data)); + } + DBUG_PRINT("info", ("insert+delete succeeded")); + DBUG_RETURN(0); + } + if (cursor) { /* @@ -1189,10 +1550,12 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (!(op= cursor->updateTuple())) ERR_RETURN(trans->getNdbError()); ops_pending++; + if (uses_blob_value(false)) + blobs_pending= true; } else { - if (!(op= trans->getNdbOperation(m_tabname)) || + if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) || op->updateTuple() != 0) ERR_RETURN(trans->getNdbError()); @@ -1204,7 +1567,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) // Require that the PK for this record has previously been // read into m_value uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; + NdbRecAttr* rec= m_value[no_fields].rec; DBUG_ASSERT(rec); DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); @@ -1253,9 +1616,9 @@ int ha_ndbcluster::delete_row(const byte *record) if (cursor) { /* - We are scanning records and want to update the record + We are scanning records and want to delete the record that was just found, call deleteTuple on the cursor - to take over the lock to a new update operation + to take over the lock to a new delete operation And thus setting the primary key of the record from the active record in cursor */ @@ -1270,7 +1633,7 @@ int ha_ndbcluster::delete_row(const byte *record) else { - if (!(op=trans->getNdbOperation(m_tabname)) || + if (!(op=trans->getNdbOperation((NDBTAB *) m_table)) || op->deleteTuple() != 0) ERR_RETURN(trans->getNdbError()); @@ -1279,7 +1642,7 @@ int ha_ndbcluster::delete_row(const byte *record) // This table has no primary key, use "hidden" primary key DBUG_PRINT("info", ("Using hidden key")); uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; + NdbRecAttr* rec= m_value[no_fields].rec; DBUG_ASSERT(rec != NULL); if (set_hidden_key(op, no_fields, rec->aRef())) @@ -1317,7 +1680,7 @@ void ha_ndbcluster::unpack_record(byte* buf) { uint row_offset= (uint) (buf - table->record[0]); Field **field, **end; - NdbRecAttr **value= m_value; + NdbValue *value= m_value; DBUG_ENTER("unpack_record"); // Set null flag(s) @@ -1326,8 +1689,23 @@ void ha_ndbcluster::unpack_record(byte* buf) field < end; field++, value++) { - if (*value && (*value)->isNULL()) - (*field)->set_null(row_offset); + if ((*value).ptr) + { + if (! ((*field)->flags & BLOB_FLAG)) + { + if ((*value).rec->isNULL()) + (*field)->set_null(row_offset); + } + else + { + NdbBlob* ndb_blob= (*value).blob; + bool isNull= true; + int ret= ndb_blob->getNull(isNull); + DBUG_ASSERT(ret == 0); + if (isNull) + (*field)->set_null(row_offset); + } + } } #ifndef DBUG_OFF @@ -1338,7 +1716,7 @@ void ha_ndbcluster::unpack_record(byte* buf) int hidden_no= table->fields; const NDBTAB *tab= (NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); - NdbRecAttr* rec= m_value[hidden_no]; + NdbRecAttr* rec= m_value[hidden_no].rec; DBUG_ASSERT(rec); DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, hidden_col->getName(), rec->u_64_value())); @@ -1348,7 +1726,6 @@ void ha_ndbcluster::unpack_record(byte* buf) DBUG_VOID_RETURN; } - /* Utility function to print/dump the fetched field */ @@ -1366,9 +1743,9 @@ void ha_ndbcluster::print_results() { Field *field; const NDBCOL *col; - NdbRecAttr *value; + NdbValue value; - if (!(value= m_value[f])) + if (!(value= m_value[f]).ptr) { fprintf(DBUG_FILE, "Field %d was not read\n", f); continue; @@ -1377,19 +1754,28 @@ void ha_ndbcluster::print_results() DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); col= tab->getColumn(f); fprintf(DBUG_FILE, "%d: %s\t", f, col->getName()); - - if (value->isNULL()) + + NdbBlob *ndb_blob= NULL; + if (! (field->flags & BLOB_FLAG)) { - fprintf(DBUG_FILE, "NULL\n"); - continue; + if (value.rec->isNULL()) + { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } + } + else + { + ndb_blob= value.blob; + bool isNull= true; + ndb_blob->getNull(isNull); + if (isNull) { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } } switch (col->getType()) { - case NdbDictionary::Column::Blob: - case NdbDictionary::Column::Clob: - case NdbDictionary::Column::Undefined: - fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); - break; case NdbDictionary::Column::Tinyint: { char value= *field->ptr; fprintf(DBUG_FILE, "Tinyint\t%d", value); @@ -1481,6 +1867,21 @@ void ha_ndbcluster::print_results() fprintf(DBUG_FILE, "Timespec\t%llu", value); break; } + case NdbDictionary::Column::Blob: { + Uint64 len= 0; + ndb_blob->getLength(len); + fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len); + break; + } + case NdbDictionary::Column::Text: { + Uint64 len= 0; + ndb_blob->getLength(len); + fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len); + break; + } + case NdbDictionary::Column::Undefined: + fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); + break; } fprintf(DBUG_FILE, "\n"); @@ -1537,7 +1938,7 @@ int ha_ndbcluster::index_next(byte *buf) { DBUG_ENTER("index_next"); - int error = 1; + int error= 1; statistic_increment(ha_read_next_count,&LOCK_status); DBUG_RETURN(next_result(buf)); } @@ -1628,9 +2029,13 @@ int ha_ndbcluster::rnd_init(bool scan) NdbResultSet *cursor= m_active_cursor; DBUG_ENTER("rnd_init"); DBUG_PRINT("enter", ("scan: %d", scan)); - // Check that cursor is not defined + // Check if scan is to be restarted if (cursor) - DBUG_RETURN(1); + { + if (!scan) + DBUG_RETURN(1); + cursor->restart(); + } index_init(table->primary_key); DBUG_RETURN(0); } @@ -1638,11 +2043,25 @@ int ha_ndbcluster::rnd_init(bool scan) int ha_ndbcluster::close_scan() { NdbResultSet *cursor= m_active_cursor; + NdbConnection *trans= m_active_trans; DBUG_ENTER("close_scan"); if (!cursor) DBUG_RETURN(1); + + if (ops_pending) + { + /* + Take over any pending transactions to the + deleteing/updating transaction before closing the scan + */ + DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + ops_pending= 0; + } + cursor->close(); m_active_cursor= NULL; DBUG_RETURN(0); @@ -1724,7 +2143,7 @@ void ha_ndbcluster::position(const byte *record) // No primary key, get hidden key DBUG_PRINT("info", ("Getting hidden key")); int hidden_no= table->fields; - NdbRecAttr* rec= m_value[hidden_no]; + NdbRecAttr* rec= m_value[hidden_no].rec; const NDBTAB *tab= (NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); DBUG_ASSERT(hidden_col->getPrimaryKey() && @@ -1755,7 +2174,10 @@ void ha_ndbcluster::info(uint flag) if (flag & HA_STATUS_VARIABLE) DBUG_PRINT("info", ("HA_STATUS_VARIABLE")); if (flag & HA_STATUS_ERRKEY) + { DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); + errkey= dupkey; + } if (flag & HA_STATUS_AUTO) DBUG_PRINT("info", ("HA_STATUS_AUTO")); DBUG_VOID_RETURN; @@ -1875,6 +2297,8 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) break; case HA_EXTRA_CHANGE_KEY_TO_DUP: DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP")); + case HA_EXTRA_KEYREAD_PRESERVE_FIELDS: + DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_PRESERVE_FIELDS")); break; } @@ -1898,7 +2322,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) const NDBTAB *tab= (NDBTAB *) m_table; DBUG_ENTER("start_bulk_insert"); - DBUG_PRINT("enter", ("rows: %d", rows)); + DBUG_PRINT("enter", ("rows: %d", (int)rows)); rows_inserted= 0; rows_to_insert= rows; @@ -1910,7 +2334,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) degrade if too many bytes are inserted, thus it's limited by this calculation. */ - const int bytesperbatch = 8192; + const int bytesperbatch= 8192; bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns(); batch= bytesperbatch/bytes; batch= batch == 0 ? 1 : batch; @@ -1925,15 +2349,32 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) */ int ha_ndbcluster::end_bulk_insert() { + int error= 0; + DBUG_ENTER("end_bulk_insert"); - DBUG_RETURN(0); + // Check if last inserts need to be flushed + if (bulk_insert_not_flushed) + { + NdbConnection *trans= m_active_trans; + // Send rows to NDB + DBUG_PRINT("info", ("Sending inserts to NDB, "\ + "rows_inserted:%d, bulk_insert_rows: %d", + rows_inserted, bulk_insert_rows)); + bulk_insert_not_flushed= false; + if (trans->execute(NoCommit) != 0) + error= ndb_err(trans); + } + + rows_inserted= 0; + rows_to_insert= 1; + DBUG_RETURN(error); } int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) { DBUG_ENTER("extra_opt"); - DBUG_PRINT("enter", ("cache_size: %d", cache_size)); + DBUG_PRINT("enter", ("cache_size: %lu", cache_size)); DBUG_RETURN(extra(operation)); } @@ -1947,7 +2388,7 @@ int ha_ndbcluster::reset() const char **ha_ndbcluster::bas_ext() const -{ static const char *ext[1] = { NullS }; return ext; } +{ static const char *ext[1]= { NullS }; return ext; } /* @@ -2154,7 +2595,7 @@ int ha_ndbcluster::start_stmt(THD *thd) NdbConnection *tablock_trans= (NdbConnection*)thd->transaction.all.ndb_tid; - DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans)); + DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans)); DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); if (trans == NULL) ERR_RETURN(m_ndb->getNdbError()); @@ -2189,8 +2630,11 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction) if (trans->execute(Commit) != 0) { const NdbError err= trans->getNdbError(); + const NdbOperation *error_op= trans->getNdbErrorOperation(); ERR_PRINT(err); res= ndb_to_mysql_error(&err); + if (res != -1) + ndbcluster_print_error(res, error_op); } ndb->closeTransaction(trans); DBUG_RETURN(res); @@ -2216,8 +2660,11 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction) if (trans->execute(Rollback) != 0) { const NdbError err= trans->getNdbError(); + const NdbOperation *error_op= trans->getNdbErrorOperation(); ERR_PRINT(err); res= ndb_to_mysql_error(&err); + if (res != -1) + ndbcluster_print_error(res, error_op); } ndb->closeTransaction(trans); DBUG_RETURN(0); @@ -2225,71 +2672,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction) /* - Map MySQL type to the corresponding NDB type + Define NDB column based on Field. + Returns 0 or mysql error code. + Not member of ha_ndbcluster because NDBCOL cannot be declared. */ -inline NdbDictionary::Column::Type -mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg) +static int create_ndb_column(NDBCOL &col, + Field *field, + HA_CREATE_INFO *info) { - switch(mysql_type) { + // Set name + col.setName(field->field_name); + // Set type and sizes + const enum enum_field_types mysql_type= field->real_type(); + switch (mysql_type) { + // Numeric types case MYSQL_TYPE_DECIMAL: - return NdbDictionary::Column::Char; + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; case MYSQL_TYPE_TINY: - return (unsigned_flg) ? - NdbDictionary::Column::Tinyunsigned : - NdbDictionary::Column::Tinyint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Tinyunsigned); + else + col.setType(NDBCOL::Tinyint); + col.setLength(1); + break; case MYSQL_TYPE_SHORT: - return (unsigned_flg) ? - NdbDictionary::Column::Smallunsigned : - NdbDictionary::Column::Smallint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Smallunsigned); + else + col.setType(NDBCOL::Smallint); + col.setLength(1); + break; case MYSQL_TYPE_LONG: - return (unsigned_flg) ? - NdbDictionary::Column::Unsigned : - NdbDictionary::Column::Int; - case MYSQL_TYPE_TIMESTAMP: - return NdbDictionary::Column::Unsigned; - case MYSQL_TYPE_LONGLONG: - return (unsigned_flg) ? - NdbDictionary::Column::Bigunsigned : - NdbDictionary::Column::Bigint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Unsigned); + else + col.setType(NDBCOL::Int); + col.setLength(1); + break; case MYSQL_TYPE_INT24: - return (unsigned_flg) ? - NdbDictionary::Column::Mediumunsigned : - NdbDictionary::Column::Mediumint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Mediumunsigned); + else + col.setType(NDBCOL::Mediumint); + col.setLength(1); + break; + case MYSQL_TYPE_LONGLONG: + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Bigunsigned); + else + col.setType(NDBCOL::Bigint); + col.setLength(1); break; case MYSQL_TYPE_FLOAT: - return NdbDictionary::Column::Float; + col.setType(NDBCOL::Float); + col.setLength(1); + break; case MYSQL_TYPE_DOUBLE: - return NdbDictionary::Column::Double; - case MYSQL_TYPE_DATETIME : - return NdbDictionary::Column::Datetime; - case MYSQL_TYPE_DATE : - case MYSQL_TYPE_NEWDATE : - case MYSQL_TYPE_TIME : - case MYSQL_TYPE_YEAR : - // Missing NDB data types, mapped to char - return NdbDictionary::Column::Char; - case MYSQL_TYPE_ENUM : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_SET : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_TINY_BLOB : - case MYSQL_TYPE_MEDIUM_BLOB : - case MYSQL_TYPE_LONG_BLOB : - case MYSQL_TYPE_BLOB : - return NdbDictionary::Column::Blob; - case MYSQL_TYPE_VAR_STRING : - return NdbDictionary::Column::Varchar; - case MYSQL_TYPE_STRING : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_NULL : - case MYSQL_TYPE_GEOMETRY : - return NdbDictionary::Column::Undefined; - } - return NdbDictionary::Column::Undefined; + col.setType(NDBCOL::Double); + col.setLength(1); + break; + // Date types + case MYSQL_TYPE_TIMESTAMP: + col.setType(NDBCOL::Unsigned); + col.setLength(1); + break; + case MYSQL_TYPE_DATETIME: + col.setType(NDBCOL::Datetime); + col.setLength(1); + break; + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_YEAR: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + // Char types + case MYSQL_TYPE_STRING: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Binary); + else + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_VAR_STRING: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Varbinary); + else + col.setType(NDBCOL::Varchar); + col.setLength(field->pack_length()); + break; + // Blob types (all come in as MYSQL_TYPE_BLOB) + mysql_type_tiny_blob: + case MYSQL_TYPE_TINY_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + // No parts + col.setPartSize(0); + col.setStripeSize(0); + break; + mysql_type_blob: + case MYSQL_TYPE_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + // Use "<=" even if "<" is the exact condition + if (field->max_length() <= (1 << 8)) + goto mysql_type_tiny_blob; + else if (field->max_length() <= (1 << 16)) + { + col.setInlineSize(256); + col.setPartSize(2000); + col.setStripeSize(16); + } + else if (field->max_length() <= (1 << 24)) + goto mysql_type_medium_blob; + else + goto mysql_type_long_blob; + break; + mysql_type_medium_blob: + case MYSQL_TYPE_MEDIUM_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + col.setPartSize(4000); + col.setStripeSize(8); + break; + mysql_type_long_blob: + case MYSQL_TYPE_LONG_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + col.setPartSize(8000); + col.setStripeSize(4); + break; + // Other types + case MYSQL_TYPE_ENUM: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_SET: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + goto mysql_type_unsupported; + mysql_type_unsupported: + default: + return HA_ERR_UNSUPPORTED; + } + // Set nullable and pk + col.setNullable(field->maybe_null()); + col.setPrimaryKey(field->flags & PRI_KEY_FLAG); + // Set autoincrement + if (field->flags & AUTO_INCREMENT_FLAG) + { + col.setAutoIncrement(TRUE); + ulonglong value= info->auto_increment_value ? + info->auto_increment_value -1 : (ulonglong) 0; + DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value)); + col.setAutoIncrementInitialValue(value); + } + else + col.setAutoIncrement(false); + return 0; } - /* Create a table in NDB Cluster */ @@ -2299,7 +2859,6 @@ int ha_ndbcluster::create(const char *name, HA_CREATE_INFO *info) { NDBTAB tab; - NdbDictionary::Column::Type ndb_type; NDBCOL col; uint pack_length, length, i; const void *data, *pack_data; @@ -2330,31 +2889,11 @@ int ha_ndbcluster::create(const char *name, for (i= 0; i < form->fields; i++) { Field *field= form->field[i]; - ndb_type= mysql_to_ndb_type(field->real_type(), - field->flags & UNSIGNED_FLAG); DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", field->field_name, field->real_type(), field->pack_length())); - col.setName(field->field_name); - col.setType(ndb_type); - if ((ndb_type == NdbDictionary::Column::Char) || - (ndb_type == NdbDictionary::Column::Varchar)) - col.setLength(field->pack_length()); - else - col.setLength(1); - col.setNullable(field->maybe_null()); - col.setPrimaryKey(field->flags & PRI_KEY_FLAG); - if (field->flags & AUTO_INCREMENT_FLAG) - { - col.setAutoIncrement(TRUE); - ulonglong value= info->auto_increment_value ? - info->auto_increment_value -1 : (ulonglong) 0; - DBUG_PRINT("info", ("Autoincrement key, initial: %d", value)); - col.setAutoIncrementInitialValue(value); - } - else - col.setAutoIncrement(false); - + if ((my_errno= create_ndb_column(col, field, info))) + DBUG_RETURN(my_errno); tab.addColumn(col); } @@ -2389,50 +2928,10 @@ int ha_ndbcluster::create(const char *name, } DBUG_PRINT("info", ("Table %s/%s created successfully", m_dbname, m_tabname)); - - if ((my_errno= build_index_list())) - DBUG_RETURN(my_errno); - - // Create secondary indexes - KEY* key_info= form->key_info; - const char** key_name= key_names; - for (i= 0; i < form->keys; i++, key_info++, key_name++) - { - int error= 0; - DBUG_PRINT("info", ("Index %u: %s", i, *key_name)); - - switch (get_index_type_from_table(i)){ - case PRIMARY_KEY_INDEX: - // Do nothing, already created - break; - case PRIMARY_KEY_ORDERED_INDEX: - error= create_ordered_index(*key_name, key_info); - break; - case UNIQUE_ORDERED_INDEX: - if (!(error= create_ordered_index(*key_name, key_info))) - error= create_unique_index(get_unique_index_name(i), key_info); - break; - case UNIQUE_INDEX: - error= create_unique_index(get_unique_index_name(i), key_info); - break; - case ORDERED_INDEX: - error= create_ordered_index(*key_name, key_info); - break; - default: - DBUG_ASSERT(false); - break; - } + // Create secondary indexes + my_errno= build_index_list(form, ILBP_CREATE); - if (error) - { - DBUG_PRINT("error", ("Failed to create index %u", i)); - drop_table(); - my_errno= error; - break; - } - } - DBUG_RETURN(my_errno); } @@ -2468,6 +2967,7 @@ int ha_ndbcluster::create_index(const char *name, DBUG_ENTER("create_index"); DBUG_PRINT("enter", ("name: %s ", name)); + // NdbDictionary::Index ndb_index(name); NdbDictionary::Index ndb_index(name); if (unique) ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); @@ -2601,10 +3101,17 @@ int ndbcluster_drop_database(const char *path) longlong ha_ndbcluster::get_auto_increment() { - int cache_size = rows_to_insert ? rows_to_insert : 32; + DBUG_ENTER("get_auto_increment"); + DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); + int cache_size= + (rows_to_insert > autoincrement_prefetch) ? + rows_to_insert + : autoincrement_prefetch; Uint64 auto_value= - m_ndb->getAutoIncrementValue(m_tabname, cache_size); - return (longlong)auto_value; + (skip_auto_increment) ? + m_ndb->readAutoIncrementValue((NDBTAB *) m_table) + : m_ndb->getAutoIncrementValue((NDBTAB *) m_table, cache_size); + DBUG_RETURN((longlong)auto_value); } @@ -2619,15 +3126,20 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_ndb(NULL), m_table(NULL), m_table_flags(HA_REC_NOT_IN_SEQ | + HA_NULL_IN_KEY | HA_NOT_EXACT_COUNT | - HA_NO_PREFIX_CHAR_KEYS | - HA_NO_BLOBS), + HA_NO_PREFIX_CHAR_KEYS), m_use_write(false), retrieve_all_fields(FALSE), - rows_to_insert(0), + rows_to_insert(1), rows_inserted(0), bulk_insert_rows(1024), - ops_pending(0) + bulk_insert_not_flushed(false), + ops_pending(0), + skip_auto_increment(true), + blobs_buffer(0), + blobs_buffer_size(0), + dupkey((uint) -1) { int i; @@ -2643,8 +3155,10 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): for (i= 0; i < MAX_KEY; i++) { - m_indextype[i]= UNDEFINED_INDEX; - m_unique_index_name[i]= NULL; + m_index[i].type= UNDEFINED_INDEX; + m_index[i].unique_name= NULL; + m_index[i].unique_index= NULL; + m_index[i].index= NULL; } DBUG_VOID_RETURN; @@ -2660,6 +3174,8 @@ ha_ndbcluster::~ha_ndbcluster() DBUG_ENTER("~ha_ndbcluster"); release_metadata(); + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + blobs_buffer= 0; // Check for open cursor/transaction DBUG_ASSERT(m_active_cursor == NULL); @@ -2888,6 +3404,12 @@ int ndb_discover_tables() bool ndbcluster_init() { DBUG_ENTER("ndbcluster_init"); + // Set connectstring if specified + if (ndbcluster_connectstring != 0) + { + DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring)); + Ndb::setConnectString(ndbcluster_connectstring); + } // Create a Ndb object to open the connection to NDB g_ndb= new Ndb("sys"); if (g_ndb->init() != 0) @@ -2921,6 +3443,7 @@ bool ndbcluster_init() bool ndbcluster_end() { DBUG_ENTER("ndbcluster_end"); + delete g_ndb; g_ndb= NULL; if (!ndbcluster_inited) @@ -2934,13 +3457,22 @@ bool ndbcluster_end() DBUG_RETURN(0); } -void ndbcluster_print_error(int error) +/* + Static error print function called from + static handler method ndbcluster_commit + and ndbcluster_rollback +*/ + +void ndbcluster_print_error(int error, const NdbOperation *error_op) { DBUG_ENTER("ndbcluster_print_error"); TABLE tab; - tab.table_name = NULL; + const char *tab_name= (error_op) ? error_op->getTableName() : ""; + tab.table_name= (char *) tab_name; ha_ndbcluster error_handler(&tab); + tab.file= &error_handler; error_handler.print_error(error, MYF(0)); + DBUG_VOID_RETURN; } /* @@ -2965,7 +3497,7 @@ void ha_ndbcluster::set_tabname(const char *path_name) ptr= m_tabname; while (*ptr != '\0') { - *ptr = tolower(*ptr); + *ptr= tolower(*ptr); ptr++; } #endif @@ -2981,17 +3513,17 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname) char *end, *ptr; /* Scan name from the end */ - end = strend(path_name)-1; - ptr = end; + end= strend(path_name)-1; + ptr= end; while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { ptr--; } - uint name_len = end - ptr; + uint name_len= end - ptr; memcpy(tabname, ptr + 1, end - ptr); - tabname[name_len] = '\0'; + tabname[name_len]= '\0'; #ifdef __WIN__ /* Put to lower case */ - ptr = tabname; + ptr= tabname; while (*ptr != '\0') { *ptr= tolower(*ptr); @@ -3154,7 +3686,7 @@ static int packfrm(const void *data, uint len, DBUG_PRINT("enter", ("data: %x, len: %d", data, len)); error= 1; - org_len = len; + org_len= len; if (my_compress((byte*)data, &org_len, &comp_len)) goto err; @@ -3174,9 +3706,9 @@ static int packfrm(const void *data, uint len, // Copy frm data into blob, already in machine independent format memcpy(blob->data, data, org_len); - *pack_data = blob; - *pack_len = blob_len; - error = 0; + *pack_data= blob; + *pack_len= blob_len; + error= 0; DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len)); err: @@ -3188,7 +3720,7 @@ err: static int unpackfrm(const void **unpack_data, uint *unpack_len, const void *pack_data) { - const frm_blob_struct *blob = (frm_blob_struct*)pack_data; + const frm_blob_struct *blob= (frm_blob_struct*)pack_data; byte *data; ulong complen, orglen, ver; DBUG_ENTER("unpackfrm"); @@ -3204,7 +3736,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, if (ver != 1) DBUG_RETURN(1); - if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME)))) + if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME)))) DBUG_RETURN(2); memcpy(data, blob->data, complen); @@ -3214,8 +3746,8 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len, DBUG_RETURN(3); } - *unpack_data = data; - *unpack_len = complen; + *unpack_data= data; + *unpack_len= complen; DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len)); |