summaryrefslogtreecommitdiff
path: root/storage/maria/trxman.c
diff options
context:
space:
mode:
Diffstat (limited to 'storage/maria/trxman.c')
-rw-r--r--storage/maria/trxman.c261
1 files changed, 261 insertions, 0 deletions
diff --git a/storage/maria/trxman.c b/storage/maria/trxman.c
new file mode 100644
index 00000000000..3c64d93183a
--- /dev/null
+++ b/storage/maria/trxman.c
@@ -0,0 +1,261 @@
+
+#include <my_global.h>
+#include <my_sys.h>
+#include <lf.h>
+#include "trxman.h"
+
+TRX active_list_min, active_list_max,
+ committed_list_min, committed_list_max, *pool;
+
+pthread_mutex_t LOCK_trx_list;
+uint active_transactions;
+TrID global_trid_generator;
+
+TRX **short_id_to_trx;
+my_atomic_rwlock_t LOCK_short_id_to_trx;
+
+LF_HASH trid_to_trx;
+
+static byte *trx_get_hash_key(const byte *trx,uint* len, my_bool unused)
+{
+ *len= sizeof(TrID);
+ return (byte *) & ((*((TRX **)trx))->trid);
+}
+
+int trxman_init()
+{
+ pthread_mutex_init(&LOCK_trx_list, MY_MUTEX_INIT_FAST);
+ active_list_max.trid= active_list_min.trid= 0;
+ active_list_max.min_read_from=~0;
+ active_list_max.next= active_list_min.prev= 0;
+ active_list_max.prev= &active_list_min;
+ active_list_min.next= &active_list_max;
+ active_transactions= 0;
+
+ committed_list_max.commit_trid= ~0;
+ committed_list_max.next= committed_list_min.prev= 0;
+ committed_list_max.prev= &committed_list_min;
+ committed_list_min.next= &committed_list_max;
+
+ pool=0;
+ global_trid_generator=0; /* set later by recovery code */
+ lf_hash_init(&trid_to_trx, sizeof(TRX*), LF_HASH_UNIQUE,
+ 0, 0, trx_get_hash_key, 0);
+ my_atomic_rwlock_init(&LOCK_short_id_to_trx);
+ short_id_to_trx=(TRX **)my_malloc(SHORT_ID_MAX*sizeof(TRX*),
+ MYF(MY_WME|MY_ZEROFILL));
+ if (!short_id_to_trx)
+ return 1;
+ short_id_to_trx--; /* min short_id is 1 */
+
+ return 0;
+}
+
+int trxman_destroy()
+{
+ DBUG_ASSERT(trid_to_trx.count == 0);
+ DBUG_ASSERT(active_transactions == 0);
+ DBUG_ASSERT(active_list_max.prev == &active_list_min);
+ DBUG_ASSERT(active_list_min.next == &active_list_max);
+ DBUG_ASSERT(committed_list_max.prev == &committed_list_min);
+ DBUG_ASSERT(committed_list_min.next == &committed_list_max);
+ while (pool)
+ {
+ TRX *tmp=pool->next;
+ my_free(pool, MYF(0));
+ pool=tmp;
+ }
+ lf_hash_destroy(&trid_to_trx);
+ pthread_mutex_destroy(&LOCK_trx_list);
+ my_atomic_rwlock_destroy(&LOCK_short_id_to_trx);
+ my_free((void *)(short_id_to_trx+1), MYF(0));
+}
+
+static TrID new_trid()
+{
+ DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL);
+ safe_mutex_assert_owner(&LOCK_trx_list);
+ return ++global_trid_generator;
+}
+
+static void set_short_id(TRX *trx)
+{
+ int i= (global_trid_generator + (intptr)trx) * 312089 % SHORT_ID_MAX;
+ my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx);
+ for ( ; ; i= i % SHORT_ID_MAX + 1) /* the range is [1..SHORT_ID_MAX] */
+ {
+ void *tmp=NULL;
+ if (short_id_to_trx[i] == NULL &&
+ my_atomic_casptr((void **)&short_id_to_trx[i], &tmp, trx))
+ break;
+ }
+ my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx);
+ trx->short_id= i;
+}
+
+extern int global_malloc;
+TRX *trxman_new_trx()
+{
+ TRX *trx;
+
+ my_atomic_add32(&active_transactions, 1);
+
+ /*
+ we need a mutex here to ensure that
+ transactions in the active list are ordered by the trid.
+ So, incrementing global_trid_generator and
+ adding to the list must be atomic.
+
+ and as we have a mutex, we can as well do everything
+ under it - allocating a TRX, incrementing active_transactions,
+ setting trx->min_read_from.
+
+ Note that all the above is fast. generating short_id may be slow,
+ as it involves scanning a big array - so it's still done
+ outside of the mutex.
+ */
+
+ pthread_mutex_lock(&LOCK_trx_list);
+ trx=pool;
+ while (trx && !my_atomic_casptr((void **)&pool, (void **)&trx, trx->next))
+ /* no-op */;
+
+ if (!trx)
+ {
+ trx=(TRX *)my_malloc(sizeof(TRX), MYF(MY_WME));
+ global_malloc++;
+ }
+ if (!trx)
+ return 0;
+
+ trx->min_read_from= active_list_min.next->trid;
+
+ trx->trid= new_trid();
+ trx->short_id= 0;
+
+ trx->next= &active_list_max;
+ trx->prev= active_list_max.prev;
+ active_list_max.prev= trx->prev->next= trx;
+ pthread_mutex_unlock(&LOCK_trx_list);
+
+ trx->pins=lf_hash_get_pins(&trid_to_trx);
+
+ if (!trx->min_read_from)
+ trx->min_read_from= trx->trid;
+
+ trx->commit_trid=0;
+
+ set_short_id(trx); /* this must be the last! */
+
+
+ return trx;
+}
+
+/*
+ remove a trx from the active list,
+ move to committed list,
+ set commit_trid
+
+ TODO
+ integrate with lock manager, log manager. That means:
+ a common "commit" mutex - forcing the log and setting commit_trid
+ must be done atomically (QQ how the heck it could be done with
+ group commit ???)
+
+ trid_to_trx, active_list_*, and committed_list_* can be
+ updated asyncronously.
+*/
+void trxman_end_trx(TRX *trx, my_bool commit)
+{
+ int res;
+ TRX *free_me= 0;
+ LF_PINS *pins= trx->pins;
+
+ pthread_mutex_lock(&LOCK_trx_list);
+ trx->next->prev= trx->prev;
+ trx->prev->next= trx->next;
+
+ if (trx->prev == &active_list_min)
+ {
+ TRX *t;
+ for (t= committed_list_min.next;
+ t->commit_trid < active_list_min.next->min_read_from;
+ t= t->next) /* no-op */;
+
+ if (t != committed_list_min.next)
+ {
+ free_me= committed_list_min.next;
+ committed_list_min.next= t;
+ t->prev->next=0;
+ t->prev= &committed_list_min;
+ }
+ }
+
+ my_atomic_rwlock_wrlock(&LOCK_short_id_to_trx);
+ my_atomic_storeptr((void **)&short_id_to_trx[trx->short_id], 0);
+ my_atomic_rwlock_wrunlock(&LOCK_short_id_to_trx);
+
+ if (commit && active_list_min.next != &active_list_max)
+ {
+ trx->commit_trid= global_trid_generator;
+
+ trx->next= &committed_list_max;
+ trx->prev= committed_list_max.prev;
+ committed_list_max.prev= trx->prev->next= trx;
+
+ res= lf_hash_insert(&trid_to_trx, pins, &trx);
+ DBUG_ASSERT(res == 0);
+ }
+ else
+ {
+ trx->next=free_me;
+ free_me=trx;
+ }
+ pthread_mutex_unlock(&LOCK_trx_list);
+
+ my_atomic_add32(&active_transactions, -1);
+
+ while (free_me)
+ {
+ int res;
+ TRX *t= free_me;
+ free_me= free_me->next;
+
+ res= lf_hash_delete(&trid_to_trx, pins, &t->trid, sizeof(TrID));
+
+ trxman_free_trx(t);
+ }
+
+ lf_hash_put_pins(pins);
+}
+
+/* free a trx (add to the pool, that is */
+void trxman_free_trx(TRX *trx)
+{
+ TRX *tmp=pool;
+
+ do
+ {
+ trx->next=tmp;
+ } while (!my_atomic_casptr((void **)&pool, (void **)&tmp, trx));
+}
+
+my_bool trx_can_read_from(TRX *trx, TrID trid)
+{
+ TRX *found;
+ my_bool can;
+
+ if (trid < trx->min_read_from)
+ return TRUE;
+ if (trid > trx->trid)
+ return FALSE;
+
+ found= lf_hash_search(&trid_to_trx, trx->pins, &trid, sizeof(trid));
+ if (!found)
+ return FALSE; /* not in the hash = cannot read */
+
+ can= found->commit_trid < trx->trid;
+ lf_unpin(trx->pins, 2);
+ return can;
+}
+