summaryrefslogtreecommitdiff
path: root/mysys
diff options
context:
space:
mode:
Diffstat (limited to 'mysys')
-rw-r--r--mysys/lf_alloc-pin.c188
-rw-r--r--mysys/lf_dynarray.c10
-rw-r--r--mysys/lf_hash.c167
3 files changed, 271 insertions, 94 deletions
diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c
index e964553a64c..51c4df7c94a 100644
--- a/mysys/lf_alloc-pin.c
+++ b/mysys/lf_alloc-pin.c
@@ -1,5 +1,5 @@
/* QQ: TODO multi-pinbox */
-/* Copyright (C) 2000 MySQL AB
+/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -32,10 +32,11 @@
Pins are used to solve ABA problem. To use pins one must obey
a pinning protocol:
+
1. Let's assume that PTR is a shared pointer to an object. Shared means
that any thread may modify it anytime to point to a different object
and free the old object. Later the freed object may be potentially
- allocated by another thread. If we're unlucky that another thread may
+ allocated by another thread. If we're unlucky that other thread may
set PTR to point to this object again. This is ABA problem.
2. Create a local pointer LOCAL_PTR.
3. Pin the PTR in a loop:
@@ -70,12 +71,34 @@
pins you have is limited (and small), keeping an object pinned
prevents its reuse and cause unnecessary mallocs.
+ Explanations:
+
+ 3. The loop is important. The following can occur:
+ thread1> LOCAL_PTR= PTR
+ thread2> free(PTR); PTR=0;
+ thread1> pin(PTR, PIN_NUMBER);
+ now thread1 cannot access LOCAL_PTR, even if it's pinned,
+ because it points to a freed memory. That is, it *must*
+ verify that it has indeed pinned PTR, the shared pointer.
+
+ 6. When a thread wants to free some LOCAL_PTR, and it scans
+ all lists of pins to see whether it's pinned, it does it
+ upwards, from low pin numbers to high. Thus another thread
+ must copy an address from one pin to another in the same
+ direction - upwards, otherwise the scanning thread may
+ miss it.
+
Implementation details:
+
Pins are given away from a "pinbox". Pinbox is stack-based allocator.
It used dynarray for storing pins, new elements are allocated by dynarray
as necessary, old are pushed in the stack for reuse. ABA is solved by
- versioning a pointer - because we use an array, a pointer to pins is 32 bit,
- upper 32 bits are used for a version.
+ versioning a pointer - because we use an array, a pointer to pins is 16 bit,
+ upper 16 bits are used for a version.
+
+ It is assumed that pins belong to a thread and are not transferable
+ between threads (LF_PINS::stack_ends_here being a primary reason
+ for this limitation).
*/
#include <my_global.h>
@@ -93,11 +116,11 @@ static void _lf_pinbox_real_free(LF_PINS *pins);
void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
lf_pinbox_free_func *free_func, void *free_func_arg)
{
- DBUG_ASSERT(sizeof(LF_PINS) == 128);
DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0);
- lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS));
+ compile_time_assert(sizeof(LF_PINS) == 128);
+ lf_dynarray_init(&pinbox->pinarray, sizeof(LF_PINS));
pinbox->pinstack_top_ver= 0;
- pinbox->pins_in_stack= 0;
+ pinbox->pins_in_array= 0;
pinbox->free_ptr_offset= free_ptr_offset;
pinbox->free_func= free_func;
pinbox->free_func_arg= free_func_arg;
@@ -105,38 +128,72 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
void lf_pinbox_destroy(LF_PINBOX *pinbox)
{
- lf_dynarray_destroy(&pinbox->pinstack);
+ lf_dynarray_destroy(&pinbox->pinarray);
}
/*
Get pins from a pinbox. Usually called via lf_alloc_get_pins() or
lf_hash_get_pins().
+ SYNOPSYS
+ pinbox -
+ stack_end - a pointer to the end (top/bottom, depending on the
+ STACK_DIRECTION) of stack. Used for safe alloca. There's
+ no safety margin deducted, a caller should take care of it,
+ if necessary.
+
DESCRIPTION
get a new LF_PINS structure from a stack of unused pins,
or allocate a new one out of dynarray.
+
+ NOTE
+ It is assumed that pins belong to a thread and are not transferable
+ between threads.
*/
-LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
+LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end)
{
uint32 pins, next, top_ver;
LF_PINS *el;
-
+ /*
+ We have an array of max. 64k elements.
+ The highest index currently allocated is pinbox->pins_in_array.
+ Freed elements are in a lifo stack, pinstack_top_ver.
+ pinstack_top_ver is 32 bits; 16 low bits are the index in the
+ array, to the first element of the list. 16 high bits are a version
+ (every time the 16 low bits are updated, the 16 high bits are
+ incremented). Versioniong prevents the ABA problem.
+ */
top_ver= pinbox->pinstack_top_ver;
do
{
if (!(pins= top_ver % LF_PINBOX_MAX_PINS))
{
- pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1;
- el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins);
+ /* the stack of free elements is empty */
+ pins= my_atomic_add32(&pinbox->pins_in_array, 1)+1;
+ if (unlikely(pins >= LF_PINBOX_MAX_PINS))
+ return 0;
+ /*
+ note that the first allocated element has index 1 (pins==1).
+ index 0 is reserved to mean "NULL pointer"
+ */
+ el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinarray, pins);
+ if (unlikely(!el))
+ return 0;
break;
}
- el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins);
+ el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinarray, pins);
next= el->link;
} while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
top_ver-pins+next+LF_PINBOX_MAX_PINS));
+ /*
+ set el->link to the index of el in the dynarray (el->link has two usages:
+ - if element is allocated, it's its own index
+ - if element is free, it's its next element in the free stack
+ */
el->link= pins;
el->purgatory_count= 0;
el->pinbox= pinbox;
+ el->stack_ends_here= stack_end;
return el;
}
@@ -171,25 +228,17 @@ void _lf_pinbox_put_pins(LF_PINS *pins)
_lf_pinbox_real_free(pins);
if (pins->purgatory_count)
{
- my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock);
+ my_atomic_rwlock_wrunlock(&pins->pinbox->pinarray.lock);
pthread_yield();
- my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock);
+ my_atomic_rwlock_wrlock(&pins->pinbox->pinarray.lock);
}
}
top_ver= pinbox->pinstack_top_ver;
- if (nr == pinbox->pins_in_stack)
- {
- int32 tmp= nr;
- if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1))
- goto ret;
- }
-
do
{
pins->link= top_ver % LF_PINBOX_MAX_PINS;
} while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
top_ver-pins->link+nr+LF_PINBOX_MAX_PINS));
-ret:
return;
}
@@ -228,7 +277,7 @@ struct st_harvester {
/*
callback for _lf_dynarray_iterate:
- scan all pins or all threads and accumulate all pins
+ scan all pins of all threads and accumulate all pins
*/
static int harvest_pins(LF_PINS *el, struct st_harvester *hv)
{
@@ -243,13 +292,19 @@ static int harvest_pins(LF_PINS *el, struct st_harvester *hv)
*hv->granary++= p;
}
}
+ /*
+ hv->npins may become negative below, but it means that
+ we're on the last dynarray page and harvest_pins() won't be
+ called again. We don't bother to make hv->npins() correct
+ (that is 0) in this case.
+ */
hv->npins-= LF_DYNARRAY_LEVEL_LENGTH;
return 0;
}
/*
callback for _lf_dynarray_iterate:
- scan all pins or all threads and see if addr is present there
+ scan all pins of all threads and see if addr is present there
*/
static int match_pins(LF_PINS *el, void *addr)
{
@@ -262,28 +317,35 @@ static int match_pins(LF_PINS *el, void *addr)
return 0;
}
+#if STACK_DIRECTION < 0
+#define available_stack_size(END,CUR) (long) ((char*)(CUR) - (char*)(END))
+#else
+#define available_stack_size(END,CUR) (long) ((char*)(END) - (char*)(CUR))
+#endif
+
/*
- Scan the purgatory as free everything that can be freed
+ Scan the purgatory and free everything that can be freed
*/
static void _lf_pinbox_real_free(LF_PINS *pins)
{
- int npins;
- void *list;
- void **addr;
+ int npins, alloca_size;
+ void *list, **addr;
+ struct st_lf_alloc_node *first, *last= NULL;
LF_PINBOX *pinbox= pins->pinbox;
- npins= pinbox->pins_in_stack+1;
+ npins= pinbox->pins_in_array+1;
#ifdef HAVE_ALLOCA
+ alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins;
/* create a sorted list of pinned addresses, to speed up searches */
- if (sizeof(void *)*LF_PINBOX_PINS*npins < my_thread_stack_size)
+ if (available_stack_size(&pinbox, pins->stack_ends_here) > alloca_size)
{
struct st_harvester hv;
- addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins);
+ addr= (void **) alloca(alloca_size);
hv.granary= addr;
hv.npins= npins;
/* scan the dynarray and accumulate all pinned addresses */
- _lf_dynarray_iterate(&pinbox->pinstack,
+ _lf_dynarray_iterate(&pinbox->pinarray,
(lf_dynarray_func)harvest_pins, &hv);
npins= hv.granary-addr;
@@ -307,7 +369,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
if (addr) /* use binary search */
{
void **a, **b, **c;
- for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2)
+ for (a= addr, b= addr+npins-1, c= a+(b-a)/2; (b-a) > 1; c= a+(b-a)/2)
if (cur == *c)
a= b= c;
else if (cur > *c)
@@ -319,41 +381,52 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
}
else /* no alloca - no cookie. linear search here */
{
- if (_lf_dynarray_iterate(&pinbox->pinstack,
+ if (_lf_dynarray_iterate(&pinbox->pinarray,
(lf_dynarray_func)match_pins, cur))
goto found;
}
}
/* not pinned - freeing */
- pinbox->free_func(cur, pinbox->free_func_arg);
+ if (last)
+ last= last->next= (struct st_lf_alloc_node *)cur;
+ else
+ first= last= (struct st_lf_alloc_node *)cur;
continue;
found:
/* pinned - keeping */
add_to_purgatory(pins, cur);
}
+ if (last)
+ pinbox->free_func(first, last, pinbox->free_func_arg);
}
+/* lock-free memory allocator for fixed-size objects */
+
+LF_REQUIRE_PINS(1);
+
/*
- callback for _lf_pinbox_real_free to free an unpinned object -
+ callback for _lf_pinbox_real_free to free a list of unpinned objects -
add it back to the allocator stack
+
+ DESCRIPTION
+ 'first' and 'last' are the ends of the linked list of st_lf_alloc_node's:
+ first->el->el->....->el->last. Use first==last to free only one element.
*/
-static void alloc_free(struct st_lf_alloc_node *node, LF_ALLOCATOR *allocator)
+static void alloc_free(struct st_lf_alloc_node *first,
+ struct st_lf_alloc_node *last,
+ LF_ALLOCATOR *allocator)
{
struct st_lf_alloc_node *tmp;
tmp= allocator->top;
do
{
- node->next= tmp;
- } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) &&
+ last->next= tmp;
+ } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, first) &&
LF_BACKOFF);
}
-/* lock-free memory allocator for fixed-size objects */
-
-LF_REQUIRE_PINS(1);
-
/*
- initialize lock-free allocatod.
+ initialize lock-free allocator
SYNOPSYS
allocator -
@@ -362,6 +435,8 @@ LF_REQUIRE_PINS(1);
memory that is guaranteed to be unused after
the object is put in the purgatory. Unused by ANY
thread, not only the purgatory owner.
+ This memory will be used to link waiting-to-be-freed
+ objects in a purgatory list.
*/
void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
{
@@ -370,12 +445,19 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
allocator->top= 0;
allocator->mallocs= 0;
allocator->element_size= size;
- DBUG_ASSERT(size >= (int)sizeof(void *));
- DBUG_ASSERT(free_ptr_offset < size);
+ DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset);
}
/*
destroy the allocator, free everything that's in it
+
+ NOTE
+ As every other init/destroy function here and elsewhere it
+ is not thread safe. No, this function is no different, ensure
+ that no thread needs the allocator before destroying it.
+ We are not responsible for any damage that may be caused by
+ accessing the allocator when it is being or has been destroyed.
+ Oh yes, and don't put your cat in a microwave.
*/
void lf_alloc_destroy(LF_ALLOCATOR *allocator)
{
@@ -410,16 +492,14 @@ void *_lf_alloc_new(LF_PINS *pins)
} while (node != allocator->top && LF_BACKOFF);
if (!node)
{
- if (!(node= (void *)my_malloc(allocator->element_size,
- MYF(MY_WME|MY_ZEROFILL))))
- break;
+ node= (void *)my_malloc(allocator->element_size, MYF(MY_WME));
#ifdef MY_LF_EXTRA_DEBUG
- my_atomic_add32(&allocator->mallocs, 1);
+ if (likely(node))
+ my_atomic_add32(&allocator->mallocs, 1);
#endif
break;
}
- if (my_atomic_casptr((void **)&allocator->top,
- (void *)&node, *(void **)node))
+ if (my_atomic_casptr((void **)&allocator->top, (void *)&node, node->next))
break;
}
_lf_unpin(pins, 0);
@@ -432,7 +512,7 @@ void *_lf_alloc_new(LF_PINS *pins)
NOTE
This is NOT thread-safe !!!
*/
-uint lf_alloc_in_pool(LF_ALLOCATOR *allocator)
+uint lf_alloc_pool_count(LF_ALLOCATOR *allocator)
{
uint i;
struct st_lf_alloc_node *node;
diff --git a/mysys/lf_dynarray.c b/mysys/lf_dynarray.c
index c6dd654bf03..770b1f9342b 100644
--- a/mysys/lf_dynarray.c
+++ b/mysys/lf_dynarray.c
@@ -1,4 +1,4 @@
-/* Copyright (C) 2000 MySQL AB
+/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -21,8 +21,6 @@
Memory is allocated in non-contiguous chunks.
This data structure is not space efficient for sparse arrays.
- The number of elements is limited to 4311810304
-
Every element is aligned to sizeof(element) boundary
(to avoid false sharing if element is big enough).
@@ -32,6 +30,9 @@
to arrays of elements, on the second level it's an array of pointers
to arrays of pointers to arrays of elements. And so on.
+ With four levels the number of elements is limited to 4311810304
+ (but as in all functions index is uint, the real limit is 2^32-1)
+
Actually, it's wait-free, not lock-free ;-)
*/
@@ -192,6 +193,9 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level,
each. _lf_dynarray_iterate() calls user-supplied function on every array
from the set. It is the fastest way to scan the array, faster than
for (i=0; i < N; i++) { func(_lf_dynarray_value(dynarray, i)); }
+
+ NOTE
+ if func() returns non-zero, the scan is aborted
*/
int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg)
{
diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c
index fb2fb88492f..832f0eb5852 100644
--- a/mysys/lf_hash.c
+++ b/mysys/lf_hash.c
@@ -1,4 +1,4 @@
-/* Copyright (C) 2000 MySQL AB
+/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -36,6 +36,10 @@ typedef struct {
uint32 hashnr; /* reversed hash number, for sorting */
const byte *key;
uint keylen;
+ /*
+ data is stored here, directly after the keylen.
+ thus the pointer to data is (void*)(slist_element_ptr+1)
+ */
} LF_SLIST;
/*
@@ -77,20 +81,20 @@ static int lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
retry:
cursor->prev= (intptr *)head;
- do {
- cursor->curr= PTR(*cursor->prev);
+ do { /* PTR() isn't necessary below, head is a dummy node */
+ cursor->curr= (LF_SLIST *)(*cursor->prev);
_lf_pin(pins, 1, cursor->curr);
- } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF);
+ } while (*cursor->prev != (intptr)cursor->curr && LF_BACKOFF);
for (;;)
{
- if (!cursor->curr)
- return 0;
+ if (unlikely(!cursor->curr))
+ return 0; /* end of the list */
do {
/* QQ: XXX or goto retry ? */
link= cursor->curr->link;
cursor->next= PTR(link);
_lf_pin(pins, 0, cursor->next);
- } while(link != cursor->curr->link && LF_BACKOFF);
+ } while (link != cursor->curr->link && LF_BACKOFF);
cur_hashnr= cursor->curr->hashnr;
cur_key= cursor->curr->key;
cur_keylen= cursor->curr->keylen;
@@ -114,6 +118,10 @@ retry:
}
else
{
+ /*
+ we found a deleted node - be nice, help the other thread
+ and remove this deleted node
+ */
if (my_atomic_casptr((void **)cursor->prev,
(void **)&cursor->curr, cursor->next))
_lf_alloc_free(pins, cursor->curr);
@@ -139,31 +147,44 @@ retry:
NOTE
it uses pins[0..2], on return all pins are removed.
+ if there're nodes with the same key value, a new node is added before them.
*/
static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs,
LF_SLIST *node, LF_PINS *pins, uint flags)
{
CURSOR cursor;
- int res= -1;
+ int res;
- do
+ for (;;)
{
if (lfind(head, cs, node->hashnr, node->key, node->keylen,
&cursor, pins) &&
(flags & LF_HASH_UNIQUE))
+ {
res= 0; /* duplicate found */
+ break;
+ }
else
{
node->link= (intptr)cursor.curr;
- assert(node->link != (intptr)node);
- assert(cursor.prev != &node->link);
+ DBUG_ASSERT(node->link != (intptr)node); /* no circular references */
+ DBUG_ASSERT(cursor.prev != &node->link); /* no circular references */
if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
+ {
res= 1; /* inserted ok */
+ break;
+ }
}
- } while (res == -1);
+ }
_lf_unpin(pins, 0);
_lf_unpin(pins, 1);
_lf_unpin(pins, 2);
+ /*
+ Note that cursor.curr is not pinned here and the pointer is unreliable,
+ the object may dissapear anytime. But if it points to a dummy node, the
+ pointer is safe, because dummy nodes are never freed - initialize_bucket()
+ uses this fact.
+ */
return res ? 0 : cursor.curr;
}
@@ -183,24 +204,41 @@ static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
const byte *key, uint keylen, LF_PINS *pins)
{
CURSOR cursor;
- int res= -1;
+ int res;
- do
+ for (;;)
{
if (!lfind(head, cs, hashnr, key, keylen, &cursor, pins))
- res= 1;
+ {
+ res= 1; /* not found */
+ break;
+ }
else
+ {
+ /* mark the node deleted */
if (my_atomic_casptr((void **)&(cursor.curr->link),
- (void **)&cursor.next, 1+(char *)cursor.next))
+ (void **)&cursor.next,
+ (void *)(((intptr)cursor.next) | 1)))
{
+ /* and remove it from the list */
if (my_atomic_casptr((void **)cursor.prev,
(void **)&cursor.curr, cursor.next))
_lf_alloc_free(pins, cursor.curr);
else
+ {
+ /*
+ somebody already "helped" us and removed the node ?
+ Let's check if we need to help that someone too!
+ (to ensure the number of "set DELETED flag" actions
+ is equal to the number of "remove from the list" actions)
+ */
lfind(head, cs, hashnr, key, keylen, &cursor, pins);
+ }
res= 0;
+ break;
}
- } while (res == -1);
+ }
+ }
_lf_unpin(pins, 0);
_lf_unpin(pins, 1);
_lf_unpin(pins, 2);
@@ -226,7 +264,8 @@ static LF_SLIST *lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs,
{
CURSOR cursor;
int res= lfind(head, cs, hashnr, key, keylen, &cursor, pins);
- if (res) _lf_pin(pins, 2, cursor.curr);
+ if (res)
+ _lf_pin(pins, 2, cursor.curr);
_lf_unpin(pins, 0);
_lf_unpin(pins, 1);
return res ? cursor.curr : 0;
@@ -241,6 +280,11 @@ static inline const byte* hash_key(const LF_HASH *hash,
return record + hash->key_offset;
}
+/*
+ compute the hash key value from the raw key.
+ note, that the hash value is limited to 2^31, because we need one
+ bit to distinguish between normal and dummy nodes.
+*/
static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen)
{
ulong nr1= 1, nr2= 4;
@@ -249,8 +293,9 @@ static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen)
return nr1 & INT_MAX32;
}
-#define MAX_LOAD 1.0
-static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
+#define MAX_LOAD 1.0 /* average number of elements in a bucket */
+
+static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
/*
Initializes lf_hash, the arguments are compatible with hash_init
@@ -261,7 +306,7 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
{
lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
offsetof(LF_SLIST, key));
- lf_dynarray_init(&hash->array, sizeof(LF_SLIST **));
+ lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
hash->size= 1;
hash->count= 0;
hash->element_size= element_size;
@@ -275,14 +320,19 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
void lf_hash_destroy(LF_HASH *hash)
{
- LF_SLIST *el= *(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0);
+ LF_SLIST *el, **head= (LF_SLIST **)_lf_dynarray_value(&hash->array, 0);
+
+ if (unlikely(!head))
+ return;
+ el= *head;
+
while (el)
{
intptr next= el->link;
if (el->hashnr & 1)
- lf_alloc_real_free(&hash->alloc, el);
+ lf_alloc_direct_free(&hash->alloc, el); /* normal node */
else
- my_free((void *)el, MYF(0));
+ my_free((void *)el, MYF(0)); /* dummy node */
el= (LF_SLIST *)next;
}
lf_alloc_destroy(&hash->alloc);
@@ -297,6 +347,7 @@ void lf_hash_destroy(LF_HASH *hash)
RETURN
0 - inserted
1 - didn't (unique key conflict)
+ -1 - out of memory
NOTE
see linsert() for pin usage notes
@@ -308,14 +359,18 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
lf_rwlock_by_pins(pins);
node= (LF_SLIST *)_lf_alloc_new(pins);
+ if (unlikely(!node))
+ return -1;
memcpy(node+1, data, hash->element_size);
node->key= hash_key(hash, (byte *)(node+1), &node->keylen);
hashnr= calc_hash(hash, node->key, node->keylen);
bucket= hashnr % hash->size;
el= _lf_dynarray_lvalue(&hash->array, bucket);
- if (*el == NULL)
- initialize_bucket(hash, el, bucket, pins);
- node->hashnr= my_reverse_bits(hashnr) | 1;
+ if (unlikely(!el))
+ return -1;
+ if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
+ return -1;
+ node->hashnr= my_reverse_bits(hashnr) | 1; /* normal node */
if (linsert(el, hash->charset, node, pins, hash->flags))
{
_lf_alloc_free(pins, node);
@@ -330,9 +385,14 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
}
/*
+ DESCRIPTION
+ deletes an element with the given key from the hash (if a hash is
+ not unique and there're many elements with this key - the "first"
+ matching element is deleted)
RETURN
0 - deleted
1 - didn't (not found)
+ -1 - out of memory
NOTE
see ldelete() for pin usage notes
*/
@@ -344,8 +404,16 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
bucket= hashnr % hash->size;
lf_rwlock_by_pins(pins);
el= _lf_dynarray_lvalue(&hash->array, bucket);
- if (*el == NULL)
- initialize_bucket(hash, el, bucket, pins);
+ if (unlikely(!el))
+ return -1;
+ /*
+ note that we still need to initialize_bucket here,
+ we cannot return "node not found", because an old bucket of that
+ node may've been split and the node was assigned to a new bucket
+ that was never accessed before and thus is not initialized.
+ */
+ if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
+ return -1;
if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
(byte *)key, keylen, pins))
{
@@ -358,6 +426,12 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
}
/*
+ RETURN
+ a pointer to an element with the given key (if a hash is not unique and
+ there're many elements with this key - the "first" matching element)
+ NULL if nothing is found
+ MY_ERRPTR if OOM
+
NOTE
see lsearch() for pin usage notes
*/
@@ -369,32 +443,51 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
bucket= hashnr % hash->size;
lf_rwlock_by_pins(pins);
el= _lf_dynarray_lvalue(&hash->array, bucket);
- if (*el == NULL)
- initialize_bucket(hash, el, bucket, pins);
+ if (unlikely(!el))
+ return MY_ERRPTR;
+ if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
+ return MY_ERRPTR;
found= lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
(byte *)key, keylen, pins);
lf_rwunlock_by_pins(pins);
return found ? found+1 : 0;
}
-static const char *dummy_key= "";
+static const byte *dummy_key= "";
-static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
+/*
+ RETURN
+ 0 - ok
+ -1 - out of memory
+*/
+static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
uint bucket, LF_PINS *pins)
{
uint parent= my_clear_highest_bit(bucket);
LF_SLIST *dummy= (LF_SLIST *)my_malloc(sizeof(LF_SLIST), MYF(MY_WME));
LF_SLIST **tmp= 0, *cur;
LF_SLIST * volatile *el= _lf_dynarray_lvalue(&hash->array, parent);
- if (*el == NULL && bucket)
- initialize_bucket(hash, el, parent, pins);
- dummy->hashnr= my_reverse_bits(bucket);
+ if (unlikely(!el || !dummy))
+ return -1;
+ if (*el == NULL && bucket &&
+ unlikely(initialize_bucket(hash, el, parent, pins)))
+ return -1;
+ dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */
dummy->key= (char*) dummy_key;
dummy->keylen= 0;
- if ((cur= linsert(el, hash->charset, dummy, pins, 0)))
+ if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE)))
{
my_free((void *)dummy, MYF(0));
dummy= cur;
}
my_atomic_casptr((void **)node, (void **)&tmp, dummy);
+ /*
+ note that if the CAS above failed (after linsert() succeeded),
+ it would mean that some other thread has executed linsert() for
+ the same dummy node, its linsert() failed, it picked up our
+ dummy node (in "dummy= cur") and executed the same CAS as above.
+ Which means that even if CAS above failed we don't need to retry,
+ and we should not free(dummy) - there's no memory leak here
+ */
+ return 0;
}