summaryrefslogtreecommitdiff
path: root/storage/ndb/src/ndbapi/NdbPoolImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/ndbapi/NdbPoolImpl.cpp')
-rw-r--r--storage/ndb/src/ndbapi/NdbPoolImpl.cpp528
1 files changed, 528 insertions, 0 deletions
diff --git a/storage/ndb/src/ndbapi/NdbPoolImpl.cpp b/storage/ndb/src/ndbapi/NdbPoolImpl.cpp
new file mode 100644
index 00000000000..32e0a6f1410
--- /dev/null
+++ b/storage/ndb/src/ndbapi/NdbPoolImpl.cpp
@@ -0,0 +1,528 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "NdbPoolImpl.hpp"
+
+NdbMutex *NdbPool::pool_mutex = NULL;
+NdbPool *the_pool = NULL;
+
+NdbPool*
+NdbPool::create_instance(Ndb_cluster_connection* cc,
+ Uint32 max_ndb_obj,
+ Uint32 no_conn_obj,
+ Uint32 init_no_ndb_objects)
+{
+ if (!initPoolMutex()) {
+ return NULL;
+ }
+ NdbMutex_Lock(pool_mutex);
+ NdbPool* a_pool;
+ if (the_pool != NULL) {
+ a_pool = NULL;
+ } else {
+ the_pool = new NdbPool(cc, max_ndb_obj, no_conn_obj);
+ if (!the_pool->init(init_no_ndb_objects)) {
+ delete the_pool;
+ the_pool = NULL;
+ }
+ a_pool = the_pool;
+ }
+ NdbMutex* temp = pool_mutex;
+ if (a_pool == NULL) {
+ pool_mutex = NULL;
+ }
+ NdbMutex_Unlock(pool_mutex);
+ if (a_pool == NULL) {
+ NdbMutex_Destroy(temp);
+ }
+ return a_pool;
+}
+
+void
+NdbPool::drop_instance()
+{
+ if (pool_mutex == NULL) {
+ return;
+ }
+ NdbMutex_Lock(pool_mutex);
+ the_pool->release_all();
+ delete the_pool;
+ the_pool = NULL;
+ NdbMutex* temp = pool_mutex;
+ NdbMutex_Unlock(temp);
+ NdbMutex_Destroy(temp);
+}
+
+bool
+NdbPool::initPoolMutex()
+{
+ bool ret_result = false;
+ if (pool_mutex == NULL) {
+ pool_mutex = NdbMutex_Create();
+ ret_result = ((pool_mutex == NULL) ? false : true);
+ }
+ return ret_result;
+}
+
+NdbPool::NdbPool(Ndb_cluster_connection* cc,
+ Uint32 max_no_objects,
+ Uint32 no_conn_objects)
+{
+ if (no_conn_objects > 1024) {
+ no_conn_objects = 1024;
+ }
+ if (max_no_objects > MAX_NDB_OBJECTS) {
+ max_no_objects = MAX_NDB_OBJECTS;
+ } else if (max_no_objects == 0) {
+ max_no_objects = 1;
+ }
+ m_max_ndb_objects = max_no_objects;
+ m_no_of_conn_objects = no_conn_objects;
+ m_no_of_objects = 0;
+ m_waiting = 0;
+ m_pool_reference = NULL;
+ m_hash_entry = NULL;
+ m_first_free = NULL_POOL;
+ m_first_not_in_use = NULL_POOL;
+ m_last_free = NULL_POOL;
+ input_pool_cond = NULL;
+ output_pool_cond = NULL;
+ m_output_queue = 0;
+ m_input_queue = 0;
+ m_signal_count = 0;
+ m_cluster_connection = cc;
+}
+
+NdbPool::~NdbPool()
+{
+ NdbCondition_Destroy(input_pool_cond);
+ NdbCondition_Destroy(output_pool_cond);
+}
+
+void
+NdbPool::release_all()
+{
+ int i;
+ for (i = 0; i < m_max_ndb_objects + 1; i++) {
+ if (m_pool_reference[i].ndb_reference != NULL) {
+ assert(m_pool_reference[i].in_use);
+ assert(m_pool_reference[i].free_entry);
+ delete m_pool_reference[i].ndb_reference;
+ }
+ }
+ delete [] m_pool_reference;
+ delete [] m_hash_entry;
+ m_pool_reference = NULL;
+ m_hash_entry = NULL;
+}
+
+bool
+NdbPool::init(Uint32 init_no_objects)
+{
+ bool ret_result = false;
+ int i;
+ do {
+ input_pool_cond = NdbCondition_Create();
+ output_pool_cond = NdbCondition_Create();
+ if (input_pool_cond == NULL || output_pool_cond == NULL) {
+ break;
+ }
+ if (init_no_objects > m_max_ndb_objects) {
+ init_no_objects = m_max_ndb_objects;
+ }
+ if (init_no_objects == 0) {
+ init_no_objects = 1;
+ }
+ m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];
+ m_hash_entry = new Uint8[POOL_HASH_TABLE_SIZE];
+
+ if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {
+ delete [] m_pool_reference;
+ delete [] m_hash_entry;
+ break;
+ }
+ for (i = 0; i < m_max_ndb_objects + 1; i++) {
+ m_pool_reference[i].ndb_reference = NULL;
+ m_pool_reference[i].in_use = false;
+ m_pool_reference[i].next_free_object = i+1;
+ m_pool_reference[i].prev_free_object = i-1;
+ m_pool_reference[i].next_db_object = NULL_POOL;
+ m_pool_reference[i].prev_db_object = NULL_POOL;
+ }
+ for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {
+ m_hash_entry[i] = NULL_HASH;
+ }
+ m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;
+ m_pool_reference[1].prev_free_object = NULL_POOL;
+ m_first_not_in_use = 1;
+ m_no_of_objects = init_no_objects;
+ for (i = init_no_objects; i > 0 ; i--) {
+ Uint32 fake_id;
+ if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {
+ release_all();
+ break;
+ }
+ }
+ ret_result = true;
+ break;
+ } while (1);
+ return ret_result;
+}
+
+/*
+Get an Ndb object.
+Input:
+hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread
+ used the last time.
+a_db_name: NULL = don't check for database specific Ndb object, otherwise
+ a hint of which database is preferred.
+Output:
+hint_id: Returns id of Ndb object returned
+Return value: Ndb object pointer
+*/
+Ndb*
+NdbPool::get_ndb_object(Uint32 &hint_id,
+ const char* a_catalog_name,
+ const char* a_schema_name)
+{
+ Ndb* ret_ndb = NULL;
+ Uint32 hash_entry = compute_hash(a_schema_name);
+ NdbMutex_Lock(pool_mutex);
+ while (1) {
+ /*
+ We start by checking if we can use the hinted Ndb object.
+ */
+ if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {
+ break;
+ }
+ /*
+ The hinted Ndb object was not free. We need to allocate another object.
+ We start by checking for a free Ndb object connected to the same database.
+ */
+ if (a_schema_name && (ret_ndb = get_db_hash(hint_id,
+ hash_entry,
+ a_catalog_name,
+ a_schema_name))) {
+ break;
+ }
+ /*
+ No Ndb object connected to the preferred database was found.
+ We look for a free Ndb object in general.
+ */
+ if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {
+ break;
+ }
+ /*
+ No free Ndb object was found. If we haven't allocated objects up until the
+ maximum number yet then we can allocate a new Ndb object here.
+ */
+ if (m_no_of_objects < m_max_ndb_objects) {
+ if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {
+ assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);
+ break;
+ }
+ }
+ /*
+ We need to wait until an Ndb object becomes
+ available.
+ */
+ if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {
+ break;
+ }
+ /*
+ Not even after waiting were we able to get hold of an Ndb object. We
+ return NULL to indicate this problem.
+ */
+ ret_ndb = NULL;
+ break;
+ }
+ NdbMutex_Unlock(pool_mutex);
+ if (ret_ndb != NULL) {
+ /*
+ We need to set the catalog and schema name of the Ndb object before
+ returning it to the caller.
+ */
+ ret_ndb->setCatalogName(a_catalog_name);
+ ret_ndb->setSchemaName(a_schema_name);
+ }
+ return ret_ndb;
+}
+
+void
+NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id)
+{
+ NdbMutex_Lock(pool_mutex);
+ assert(id <= m_max_ndb_objects);
+ assert(id != 0);
+ assert(returned_ndb == m_pool_reference[id].ndb_reference);
+ bool wait_cond = m_waiting;
+ if (wait_cond) {
+ NdbCondition* pool_cond;
+ if (m_signal_count > 0) {
+ pool_cond = output_pool_cond;
+ m_signal_count--;
+ } else {
+ pool_cond = input_pool_cond;
+ }
+ add_wait_list(id);
+ NdbMutex_Unlock(pool_mutex);
+ NdbCondition_Signal(pool_cond);
+ } else {
+ add_free_list(id);
+ add_db_hash(id);
+ NdbMutex_Unlock(pool_mutex);
+ }
+}
+
+bool
+NdbPool::allocate_ndb(Uint32 &id,
+ const char* a_catalog_name,
+ const char* a_schema_name)
+{
+ Ndb* a_ndb;
+ if (m_first_not_in_use == NULL_POOL) {
+ return false;
+ }
+ if (a_schema_name) {
+ a_ndb = new Ndb(m_cluster_connection, a_schema_name, a_catalog_name);
+ } else {
+ a_ndb = new Ndb(m_cluster_connection, "");
+ }
+ if (a_ndb == NULL) {
+ return false;
+ }
+ a_ndb->init(m_no_of_conn_objects);
+ m_no_of_objects++;
+
+ id = m_first_not_in_use;
+ Uint32 allocated_id = m_first_not_in_use;
+ m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;
+
+ m_pool_reference[allocated_id].ndb_reference = a_ndb;
+ m_pool_reference[allocated_id].in_use = true;
+ m_pool_reference[allocated_id].free_entry = false;
+
+ add_free_list(allocated_id);
+ add_db_hash(allocated_id);
+ return true;
+}
+
+void
+NdbPool::add_free_list(Uint32 id)
+{
+ assert(!m_pool_reference[id].free_entry);
+ assert(m_pool_reference[id].in_use);
+ m_pool_reference[id].free_entry = true;
+ m_pool_reference[id].next_free_object = m_first_free;
+ m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;
+ m_first_free = (Uint8)id;
+ if (m_last_free == (Uint8)NULL_POOL) {
+ m_last_free = (Uint8)id;
+ }
+}
+
+void
+NdbPool::add_db_hash(Uint32 id)
+{
+ Ndb* t_ndb = m_pool_reference[id].ndb_reference;
+ const char* schema_name = t_ndb->getSchemaName();
+ Uint32 hash_entry = compute_hash(schema_name);
+ Uint8 next_db_entry = m_hash_entry[hash_entry];
+ m_pool_reference[id].next_db_object = next_db_entry;
+ m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;
+ m_hash_entry[hash_entry] = (Uint8)id;
+}
+
+Ndb*
+NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry)
+{
+ if (m_first_free == NULL_POOL) {
+ return NULL;
+ }
+ id = m_first_free;
+ Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);
+ assert(ret_ndb != NULL);
+ return ret_ndb;
+}
+
+Ndb*
+NdbPool::get_db_hash(Uint32 &id,
+ Uint32 hash_entry,
+ const char *a_catalog_name,
+ const char *a_schema_name)
+{
+ Uint32 entry_id = m_hash_entry[hash_entry];
+ bool found = false;
+ while (entry_id != NULL_HASH) {
+ Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;
+ const char *a_ndb_catalog_name = t_ndb->getCatalogName();
+ if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {
+ const char *a_ndb_schema_name = t_ndb->getSchemaName();
+ if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {
+ found = true;
+ break;
+ }
+ }
+ entry_id = m_pool_reference[entry_id].next_db_object;
+ }
+ if (found) {
+ id = entry_id;
+ Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);
+ assert(ret_ndb != NULL);
+ return ret_ndb;
+ }
+ return NULL;
+}
+
+Ndb*
+NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry)
+{
+ Ndb* ret_ndb = NULL;
+ do {
+ if ((hint_id != 0) &&
+ (hint_id <= m_max_ndb_objects) &&
+ (m_pool_reference[hint_id].in_use) &&
+ (m_pool_reference[hint_id].free_entry)) {
+ ret_ndb = m_pool_reference[hint_id].ndb_reference;
+ if (ret_ndb != NULL) {
+ break;
+ } else {
+ assert(false);
+ }
+ }
+ return NULL;
+ } while (1);
+ /*
+ This is where we remove the entry from the free list and from the db hash
+ table.
+ */
+ remove_free_list(hint_id);
+ remove_db_hash(hint_id, hash_entry);
+ return ret_ndb;
+}
+
+void
+NdbPool::remove_free_list(Uint32 id)
+{
+ Uint8 next_free_entry = m_pool_reference[id].next_free_object;
+ Uint8 prev_free_entry = m_pool_reference[id].prev_free_object;
+ if (prev_free_entry == (Uint8)NULL_POOL) {
+ m_first_free = next_free_entry;
+ } else {
+ m_pool_reference[prev_free_entry].next_free_object = next_free_entry;
+ }
+ if (next_free_entry == (Uint8)NULL_POOL) {
+ m_last_free = prev_free_entry;
+ } else {
+ m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;
+ }
+ m_pool_reference[id].next_free_object = NULL_POOL;
+ m_pool_reference[id].prev_free_object = NULL_POOL;
+ m_pool_reference[id].free_entry = false;
+}
+
+void
+NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry)
+{
+ Uint8 next_free_entry = m_pool_reference[id].next_db_object;
+ Uint8 prev_free_entry = m_pool_reference[id].prev_db_object;
+ if (prev_free_entry == (Uint8)NULL_HASH) {
+ m_hash_entry[hash_entry] = next_free_entry;
+ } else {
+ m_pool_reference[prev_free_entry].next_db_object = next_free_entry;
+ }
+ if (next_free_entry == (Uint8)NULL_HASH) {
+ ;
+ } else {
+ m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;
+ }
+ m_pool_reference[id].next_db_object = NULL_HASH;
+ m_pool_reference[id].prev_db_object = NULL_HASH;
+}
+
+Uint32
+NdbPool::compute_hash(const char *a_schema_name)
+{
+ Uint32 len = strlen(a_schema_name);
+ Uint32 h = 147;
+ for (Uint32 i = 0; i < len; i++) {
+ Uint32 c = a_schema_name[i];
+ h = (h << 5) + h + c;
+ }
+ h &= (POOL_HASH_TABLE_SIZE - 1);
+ return h;
+}
+
+Ndb*
+NdbPool::wait_free_ndb(Uint32 &id)
+{
+ int res;
+ int time_out = 3500;
+ do {
+ NdbCondition* tmp = input_pool_cond;
+ m_waiting++;
+ m_input_queue++;
+ time_out -= 500;
+ res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);
+ if (tmp == input_pool_cond) {
+ m_input_queue--;
+ } else {
+ m_output_queue--;
+ if (m_output_queue == 0) {
+ switch_condition_queue();
+ }
+ }
+ m_waiting--;
+ } while (res == 0 && m_first_wait == NULL_POOL);
+ if (res != 0 && m_first_wait == NULL_POOL) {
+ return NULL;
+ }
+ id = m_first_wait;
+ remove_wait_list();
+ assert(m_waiting != 0 || m_first_wait == NULL_POOL);
+ return m_pool_reference[id].ndb_reference;
+}
+
+void
+NdbPool::remove_wait_list()
+{
+ Uint32 id = m_first_wait;
+ m_first_wait = m_pool_reference[id].next_free_object;
+ m_pool_reference[id].next_free_object = NULL_POOL;
+ m_pool_reference[id].prev_free_object = NULL_POOL;
+ m_pool_reference[id].free_entry = false;
+}
+
+void
+NdbPool::add_wait_list(Uint32 id)
+{
+ m_pool_reference[id].next_free_object = m_first_wait;
+ m_first_wait = id;
+}
+
+void
+NdbPool::switch_condition_queue()
+{
+ m_signal_count = m_input_queue;
+ Uint8 move_queue = m_input_queue;
+ m_input_queue = m_output_queue;
+ m_output_queue = move_queue;
+
+ NdbCondition* move_cond = input_pool_cond;
+ input_pool_cond = output_pool_cond;
+ output_pool_cond = move_cond;
+}
+