summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.(none)>2004-09-14 08:52:21 +0000
committerunknown <tomas@poseidon.(none)>2004-09-14 08:52:21 +0000
commit2e43e47040f8ece21b45e2d76089d6514788125c (patch)
tree831e0140e9ce404765e5a139b988a78d6a7d1f54 /sql
parentcd573513a3744adf86b68ffd50887baa9d12bbe1 (diff)
downloadmariadb-git-2e43e47040f8ece21b45e2d76089d6514788125c.tar.gz
moved all ndb thread specific data into new placeholder
new methods to keep "records" up to date unset flag HA_NOT_EXACT_COUNT to make handler read "records" field, for count() optim and join optimization new methods to keep "records" up to datecorrect record field in ndbcluster handler new method for ndbcluster handler to store/retrieve table and thread specific data changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl hanged deleteKey to return ponter to deleted object moved heavy global cache fetch from inline to separate method mysql-test/r/ndb_alter_table.result: correct record field in ndbcluster handler mysql-test/r/ndb_blob.result: correct record field in ndbcluster handler ndb/include/ndbapi/NdbDictionary.hpp: new method for ndbcluster handler to store/retrieve table and thread specific data ndb/src/ndbapi/DictCache.cpp: changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl ndb/src/ndbapi/DictCache.hpp: changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl ndb/src/ndbapi/Ndb.cpp: replaced method DictionaryImpl::getTable with DictionaryImpl::get_local_table_info ndb/src/ndbapi/NdbDictionary.cpp: new method for ndbcluster handler to store/retrieve table and thread specific data ndb/src/ndbapi/NdbDictionaryImpl.cpp: changed local hash to store new table_info object, with placeholders for local data, instead of TableImpl moved heavy global cache fetch from inline to separate method ndb/src/ndbapi/NdbDictionaryImpl.hpp: replaced method DictionaryImpl::getTable with DictionaryImpl::get_local_table_info ndb/src/ndbapi/NdbLinHash.hpp: changed deleteKey to return ponter to deleted object sql/ha_ndbcluster.cc: moved all ndb thread specific data into new placeholder new methods to keep "records" up to date unset flag HA_NOT_EXACT_COUNT to make handler read "records" field, for count() optim and join optimization sql/ha_ndbcluster.h: new methods to keep "records" up to date sql/sql_class.h: moved all ndb thread specific data into new placeholder
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_ndbcluster.cc150
-rw-r--r--sql/ha_ndbcluster.h8
-rw-r--r--sql/sql_class.h3
3 files changed, 136 insertions, 25 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index d35f84a8fc8..2c8f6c9d698 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -87,7 +87,8 @@ static int unpackfrm(const void **data, uint *len,
const void* pack_data);
static int ndb_get_table_statistics(Ndb*, const char *,
- Uint64* rows, Uint64* commits);
+ Uint64* rows, Uint64* commits);
+
/*
Error handling functions
@@ -138,6 +139,93 @@ static int ndb_to_mysql_error(const NdbError *err)
/*
+ Place holder for ha_ndbcluster thread specific data
+*/
+
+class Thd_ndb {
+public:
+ Thd_ndb();
+ ~Thd_ndb();
+ Ndb *ndb;
+ ulong count;
+ uint lock_count;
+};
+
+Thd_ndb::Thd_ndb()
+{
+ ndb= 0;
+ lock_count= 0;
+ count= 0;
+}
+
+Thd_ndb::~Thd_ndb()
+{
+}
+
+/*
+ * manage uncommitted insert/deletes during transactio to get records correct
+ */
+
+struct Ndb_table_local_info {
+ int no_uncommitted_rows_count;
+ ulong transaction_count;
+ ha_rows records;
+};
+
+void ha_ndbcluster::records_update()
+{
+ DBUG_ENTER("ha_ndbcluster::records_update");
+ struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
+ DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
+ ((const NDBTAB *)m_table)->getTableId(),
+ info->no_uncommitted_rows_count));
+ if (info->records == ~(ha_rows)0)
+ {
+ Uint64 rows;
+ if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
+ info->records= rows;
+ }
+ }
+ records= info->records+ info->no_uncommitted_rows_count;
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
+ struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
+ Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
+ if (info->transaction_count != thd_ndb->count)
+ {
+ info->transaction_count = thd_ndb->count;
+ info->no_uncommitted_rows_count= 0;
+ info->records= ~(ha_rows)0;
+ DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
+ ((const NDBTAB *)m_table)->getTableId(),
+ info->no_uncommitted_rows_count));
+ }
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_update(int c)
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
+ struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
+ info->no_uncommitted_rows_count+= c;
+ DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
+ ((const NDBTAB *)m_table)->getTableId(),
+ info->no_uncommitted_rows_count));
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
+ ((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
+ DBUG_VOID_RETURN;
+}
+
+/*
Take care of the error that occured in NDB
RETURN
@@ -145,6 +233,7 @@ static int ndb_to_mysql_error(const NdbError *err)
# The mapped error code
*/
+
int ha_ndbcluster::ndb_err(NdbConnection *trans)
{
int res;
@@ -506,7 +595,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_ENTER("get_metadata");
DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
- if (!(tab= dict->getTable(m_tabname)))
+ if (!(tab= dict->getTable(m_tabname, &m_table_info)))
ERR_RETURN(dict->getNdbError());
DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
@@ -556,10 +645,6 @@ int ha_ndbcluster::get_metadata(const char *path)
// All checks OK, lets use the table
m_table= (void*)tab;
- Uint64 rows;
- if(false && ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
- records= rows;
- }
DBUG_RETURN(build_index_list(table, ILBP_OPEN));
}
@@ -1480,6 +1565,7 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted++;
+ no_uncommitted_rows_update(1);
bulk_insert_not_flushed= true;
if ((rows_to_insert == 1) ||
((rows_inserted % bulk_insert_rows) == 0) ||
@@ -1701,6 +1787,8 @@ int ha_ndbcluster::delete_row(const byte *record)
ERR_RETURN(trans->getNdbError());
ops_pending++;
+ no_uncommitted_rows_update(-1);
+
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0);
}
@@ -1711,6 +1799,8 @@ int ha_ndbcluster::delete_row(const byte *record)
op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
+ no_uncommitted_rows_update(-1);
+
if (table->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
@@ -2259,7 +2349,10 @@ void ha_ndbcluster::info(uint flag)
if (flag & HA_STATUS_CONST)
DBUG_PRINT("info", ("HA_STATUS_CONST"));
if (flag & HA_STATUS_VARIABLE)
+ {
DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
+ records_update();
+ }
if (flag & HA_STATUS_ERRKEY)
{
DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
@@ -2558,9 +2651,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
NdbConnection* trans= NULL;
DBUG_ENTER("external_lock");
- DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
- thd->transaction.ndb_lock_count));
-
/*
Check that this handler instance has a connection
set up to the Ndb object of thd
@@ -2568,10 +2658,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (check_ndb_connection())
DBUG_RETURN(1);
+ Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
+
+ DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d",
+ thd_ndb->lock_count));
+
if (lock_type != F_UNLCK)
{
DBUG_PRINT("info", ("lock_type != F_UNLCK"));
- if (!thd->transaction.ndb_lock_count++)
+ if (!thd_ndb->lock_count++)
{
PRINT_OPTION_FLAGS(thd);
@@ -2584,6 +2679,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
+ no_uncommitted_rows_reset(thd);
thd->transaction.stmt.ndb_tid= trans;
}
else
@@ -2597,6 +2693,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
+ no_uncommitted_rows_reset(thd);
/*
If this is the start of a LOCK TABLE, a table look
@@ -2633,11 +2730,12 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
// Start of transaction
retrieve_all_fields= FALSE;
ops_pending= 0;
+ no_uncommitted_rows_init(thd);
}
else
{
DBUG_PRINT("info", ("lock_type == F_UNLCK"));
- if (!--thd->transaction.ndb_lock_count)
+ if (!--thd_ndb->lock_count)
{
DBUG_PRINT("trans", ("Last external_lock"));
PRINT_OPTION_FLAGS(thd);
@@ -2696,6 +2794,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
+ no_uncommitted_rows_reset(thd);
thd->transaction.stmt.ndb_tid= trans;
}
m_active_trans= trans;
@@ -2715,7 +2814,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
int ndbcluster_commit(THD *thd, void *ndb_transaction)
{
int res= 0;
- Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
NdbConnection *trans= (NdbConnection*)ndb_transaction;
DBUG_ENTER("ndbcluster_commit");
@@ -2733,7 +2832,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
if (res != -1)
ndbcluster_print_error(res, error_op);
}
- ndb->closeTransaction(trans);
+ ndb->closeTransaction(trans);
DBUG_RETURN(res);
}
@@ -2745,7 +2844,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
int ndbcluster_rollback(THD *thd, void *ndb_transaction)
{
int res= 0;
- Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
NdbConnection *trans= (NdbConnection*)ndb_transaction;
DBUG_ENTER("ndbcluster_rollback");
@@ -3222,9 +3321,9 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_active_cursor(NULL),
m_ndb(NULL),
m_table(NULL),
+ m_table_info(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
- HA_NOT_EXACT_COUNT |
HA_NO_PREFIX_CHAR_KEYS),
m_share(0),
m_use_write(false),
@@ -3249,7 +3348,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
// TODO Adjust number of records and other parameters for proper
// selection of scan/pk access
- records= 100;
+ // records= 100;
+ records= 0;
block_size= 1024;
for (i= 0; i < MAX_KEY; i++)
@@ -3401,25 +3501,30 @@ int ha_ndbcluster::check_ndb_connection()
Ndb* ndb;
DBUG_ENTER("check_ndb_connection");
- if (!thd->transaction.ndb)
+ if (!thd->transaction.thd_ndb)
{
ndb= seize_ndb();
if (!ndb)
DBUG_RETURN(2);
- thd->transaction.ndb= ndb;
+ thd->transaction.thd_ndb= new Thd_ndb();
+ ((Thd_ndb *)thd->transaction.thd_ndb)->ndb= ndb;
}
- m_ndb= (Ndb*)thd->transaction.ndb;
+ m_ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
m_ndb->setDatabaseName(m_dbname);
DBUG_RETURN(0);
}
void ndbcluster_close_connection(THD *thd)
{
+ Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
Ndb* ndb;
DBUG_ENTER("ndbcluster_close_connection");
- ndb= (Ndb*)thd->transaction.ndb;
- ha_ndbcluster::release_ndb(ndb);
- thd->transaction.ndb= NULL;
+ if (thd_ndb)
+ {
+ ha_ndbcluster::release_ndb(thd_ndb->ndb);
+ delete thd_ndb;
+ thd->transaction.thd_ndb= NULL;
+ }
DBUG_VOID_RETURN;
}
@@ -3539,6 +3644,7 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index c49a6078e7a..44a6873f4e5 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -214,7 +214,8 @@ class ha_ndbcluster: public handler
NdbConnection *m_active_trans;
NdbResultSet *m_active_cursor;
Ndb *m_ndb;
- void *m_table;
+ void *m_table;
+ void *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
char m_tabname[FN_HEADLEN];
@@ -238,6 +239,11 @@ class ha_ndbcluster: public handler
char *blobs_buffer;
uint32 blobs_buffer_size;
uint dupkey;
+
+ void records_update();
+ void no_uncommitted_rows_update(int);
+ void no_uncommitted_rows_init(THD *);
+ void no_uncommitted_rows_reset(THD *);
};
bool ndbcluster_init(void);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 5a5b0fa81ce..387bba43cad 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -764,9 +764,8 @@ public:
THD_TRANS all; // Trans since BEGIN WORK
THD_TRANS stmt; // Trans for current statement
uint bdb_lock_count;
- uint ndb_lock_count;
#ifdef HAVE_NDBCLUSTER_DB
- void* ndb;
+ void* thd_ndb;
#endif
bool on;
/*