diff options
Diffstat (limited to 'storage/maria/trxman.c')
-rw-r--r-- | storage/maria/trxman.c | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/storage/maria/trxman.c b/storage/maria/trxman.c new file mode 100644 index 00000000000..3c64d93183a --- /dev/null +++ b/storage/maria/trxman.c @@ -0,0 +1,261 @@ + +#include <my_global.h> +#include <my_sys.h> +#include <lf.h> +#include "trxman.h" + +TRX active_list_min, active_list_max, + committed_list_min, committed_list_max, *pool; + +pthread_mutex_t LOCK_trx_list; +uint active_transactions; +TrID global_trid_generator; + +TRX **short_id_to_trx; +my_atomic_rwlock_t LOCK_short_id_to_trx; + +LF_HASH trid_to_trx; + +static byte *trx_get_hash_key(const byte *trx,uint* len, my_bool unused) +{ + *len= sizeof(TrID); + return (byte *) & ((*((TRX **)trx))->trid); +} + +int trxman_init() +{ + pthread_mutex_init(&LOCK_trx_list, MY_MUTEX_INIT_FAST); + active_list_max.trid= active_list_min.trid= 0; + active_list_max.min_read_from=~0; + active_list_max.next= active_list_min.prev= 0; + active_list_max.prev= &active_list_min; + active_list_min.next= &active_list_max; + active_transactions= 0; + + committed_list_max.commit_trid= ~0; + committed_list_max.next= committed_list_min.prev= 0; + committed_list_max.prev= &committed_list_min; + committed_list_min.next= &committed_list_max; + + pool=0; + global_trid_generator=0; /* set later by recovery code */ + lf_hash_init(&trid_to_trx, sizeof(TRX*), LF_HASH_UNIQUE, + 0, 0, trx_get_hash_key, 0); + my_atomic_rwlock_init(&LOCK_short_id_to_trx); + short_id_to_trx=(TRX **)my_malloc(SHORT_ID_MAX*sizeof(TRX*), + MYF(MY_WME|MY_ZEROFILL)); + if (!short_id_to_trx) + return 1; + short_id_to_trx--; /* min short_id is 1 */ + + return 0; +} + +int trxman_destroy() +{ + DBUG_ASSERT(trid_to_trx.count == 0); + DBUG_ASSERT(active_transactions == 0); + DBUG_ASSERT(active_list_max.prev == &active_list_min); + DBUG_ASSERT(active_list_min.next == &active_list_max); + DBUG_ASSERT(committed_list_max.prev == &committed_list_min); + DBUG_ASSERT(committed_list_min.next == &committed_list_max); + while (pool) + { + TRX *tmp=pool->next; + my_free(pool, MYF(0)); + pool=tmp; + } + lf_hash_destroy(&trid_to_trx); + pthread_mutex_destroy(&LOCK_trx_list); + my_atomic_rwlock_destroy(&LOCK_short_id_to_trx); + my_free((void *)(short_id_to_trx+1), MYF(0)); +} + +static TrID new_trid() +{ + DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL); + safe_mutex_assert_owner(&LOCK_trx_list); + return ++global_trid_generator; +} + +static void set_short_id(TRX *trx) +{ + int i= (global_trid_generator + (intptr)trx) * 312089 % SHORT_ID_MAX; + my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx); + for ( ; ; i= i % SHORT_ID_MAX + 1) /* the range is [1..SHORT_ID_MAX] */ + { + void *tmp=NULL; + if (short_id_to_trx[i] == NULL && + my_atomic_casptr((void **)&short_id_to_trx[i], &tmp, trx)) + break; + } + my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx); + trx->short_id= i; +} + +extern int global_malloc; +TRX *trxman_new_trx() +{ + TRX *trx; + + my_atomic_add32(&active_transactions, 1); + + /* + we need a mutex here to ensure that + transactions in the active list are ordered by the trid. + So, incrementing global_trid_generator and + adding to the list must be atomic. + + and as we have a mutex, we can as well do everything + under it - allocating a TRX, incrementing active_transactions, + setting trx->min_read_from. + + Note that all the above is fast. generating short_id may be slow, + as it involves scanning a big array - so it's still done + outside of the mutex. + */ + + pthread_mutex_lock(&LOCK_trx_list); + trx=pool; + while (trx && !my_atomic_casptr((void **)&pool, (void **)&trx, trx->next)) + /* no-op */; + + if (!trx) + { + trx=(TRX *)my_malloc(sizeof(TRX), MYF(MY_WME)); + global_malloc++; + } + if (!trx) + return 0; + + trx->min_read_from= active_list_min.next->trid; + + trx->trid= new_trid(); + trx->short_id= 0; + + trx->next= &active_list_max; + trx->prev= active_list_max.prev; + active_list_max.prev= trx->prev->next= trx; + pthread_mutex_unlock(&LOCK_trx_list); + + trx->pins=lf_hash_get_pins(&trid_to_trx); + + if (!trx->min_read_from) + trx->min_read_from= trx->trid; + + trx->commit_trid=0; + + set_short_id(trx); /* this must be the last! */ + + + return trx; +} + +/* + remove a trx from the active list, + move to committed list, + set commit_trid + + TODO + integrate with lock manager, log manager. That means: + a common "commit" mutex - forcing the log and setting commit_trid + must be done atomically (QQ how the heck it could be done with + group commit ???) + + trid_to_trx, active_list_*, and committed_list_* can be + updated asyncronously. +*/ +void trxman_end_trx(TRX *trx, my_bool commit) +{ + int res; + TRX *free_me= 0; + LF_PINS *pins= trx->pins; + + pthread_mutex_lock(&LOCK_trx_list); + trx->next->prev= trx->prev; + trx->prev->next= trx->next; + + if (trx->prev == &active_list_min) + { + TRX *t; + for (t= committed_list_min.next; + t->commit_trid < active_list_min.next->min_read_from; + t= t->next) /* no-op */; + + if (t != committed_list_min.next) + { + free_me= committed_list_min.next; + committed_list_min.next= t; + t->prev->next=0; + t->prev= &committed_list_min; + } + } + + my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx); + my_atomic_storeptr((void **)&short_id_to_trx[trx->short_id], 0); + my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx); + + if (commit && active_list_min.next != &active_list_max) + { + trx->commit_trid= global_trid_generator; + + trx->next= &committed_list_max; + trx->prev= committed_list_max.prev; + committed_list_max.prev= trx->prev->next= trx; + + res= lf_hash_insert(&trid_to_trx, pins, &trx); + DBUG_ASSERT(res == 0); + } + else + { + trx->next=free_me; + free_me=trx; + } + pthread_mutex_unlock(&LOCK_trx_list); + + my_atomic_add32(&active_transactions, -1); + + while (free_me) + { + int res; + TRX *t= free_me; + free_me= free_me->next; + + res= lf_hash_delete(&trid_to_trx, pins, &t->trid, sizeof(TrID)); + + trxman_free_trx(t); + } + + lf_hash_put_pins(pins); +} + +/* free a trx (add to the pool, that is */ +void trxman_free_trx(TRX *trx) +{ + TRX *tmp=pool; + + do + { + trx->next=tmp; + } while (!my_atomic_casptr((void **)&pool, (void **)&tmp, trx)); +} + +my_bool trx_can_read_from(TRX *trx, TrID trid) +{ + TRX *found; + my_bool can; + + if (trid < trx->min_read_from) + return TRUE; + if (trid > trx->trid) + return FALSE; + + found= lf_hash_search(&trid_to_trx, trx->pins, &trid, sizeof(trid)); + if (!found) + return FALSE; /* not in the hash = cannot read */ + + can= found->commit_trid < trx->trid; + lf_unpin(trx->pins, 2); + return can; +} + |