diff options
Diffstat (limited to 'mysys')
-rw-r--r-- | mysys/lf_alloc-pin.c | 188 | ||||
-rw-r--r-- | mysys/lf_dynarray.c | 10 | ||||
-rw-r--r-- | mysys/lf_hash.c | 167 |
3 files changed, 271 insertions, 94 deletions
diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c index e964553a64c..51c4df7c94a 100644 --- a/mysys/lf_alloc-pin.c +++ b/mysys/lf_alloc-pin.c @@ -1,5 +1,5 @@ /* QQ: TODO multi-pinbox */ -/* Copyright (C) 2000 MySQL AB +/* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,10 +32,11 @@ Pins are used to solve ABA problem. To use pins one must obey a pinning protocol: + 1. Let's assume that PTR is a shared pointer to an object. Shared means that any thread may modify it anytime to point to a different object and free the old object. Later the freed object may be potentially - allocated by another thread. If we're unlucky that another thread may + allocated by another thread. If we're unlucky that other thread may set PTR to point to this object again. This is ABA problem. 2. Create a local pointer LOCAL_PTR. 3. Pin the PTR in a loop: @@ -70,12 +71,34 @@ pins you have is limited (and small), keeping an object pinned prevents its reuse and cause unnecessary mallocs. + Explanations: + + 3. The loop is important. The following can occur: + thread1> LOCAL_PTR= PTR + thread2> free(PTR); PTR=0; + thread1> pin(PTR, PIN_NUMBER); + now thread1 cannot access LOCAL_PTR, even if it's pinned, + because it points to a freed memory. That is, it *must* + verify that it has indeed pinned PTR, the shared pointer. + + 6. When a thread wants to free some LOCAL_PTR, and it scans + all lists of pins to see whether it's pinned, it does it + upwards, from low pin numbers to high. Thus another thread + must copy an address from one pin to another in the same + direction - upwards, otherwise the scanning thread may + miss it. + Implementation details: + Pins are given away from a "pinbox". Pinbox is stack-based allocator. It used dynarray for storing pins, new elements are allocated by dynarray as necessary, old are pushed in the stack for reuse. ABA is solved by - versioning a pointer - because we use an array, a pointer to pins is 32 bit, - upper 32 bits are used for a version. + versioning a pointer - because we use an array, a pointer to pins is 16 bit, + upper 16 bits are used for a version. + + It is assumed that pins belong to a thread and are not transferable + between threads (LF_PINS::stack_ends_here being a primary reason + for this limitation). */ #include <my_global.h> @@ -93,11 +116,11 @@ static void _lf_pinbox_real_free(LF_PINS *pins); void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, lf_pinbox_free_func *free_func, void *free_func_arg) { - DBUG_ASSERT(sizeof(LF_PINS) == 128); DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0); - lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS)); + compile_time_assert(sizeof(LF_PINS) == 128); + lf_dynarray_init(&pinbox->pinarray, sizeof(LF_PINS)); pinbox->pinstack_top_ver= 0; - pinbox->pins_in_stack= 0; + pinbox->pins_in_array= 0; pinbox->free_ptr_offset= free_ptr_offset; pinbox->free_func= free_func; pinbox->free_func_arg= free_func_arg; @@ -105,38 +128,72 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, void lf_pinbox_destroy(LF_PINBOX *pinbox) { - lf_dynarray_destroy(&pinbox->pinstack); + lf_dynarray_destroy(&pinbox->pinarray); } /* Get pins from a pinbox. Usually called via lf_alloc_get_pins() or lf_hash_get_pins(). + SYNOPSYS + pinbox - + stack_end - a pointer to the end (top/bottom, depending on the + STACK_DIRECTION) of stack. Used for safe alloca. There's + no safety margin deducted, a caller should take care of it, + if necessary. + DESCRIPTION get a new LF_PINS structure from a stack of unused pins, or allocate a new one out of dynarray. + + NOTE + It is assumed that pins belong to a thread and are not transferable + between threads. */ -LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox) +LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end) { uint32 pins, next, top_ver; LF_PINS *el; - + /* + We have an array of max. 64k elements. + The highest index currently allocated is pinbox->pins_in_array. + Freed elements are in a lifo stack, pinstack_top_ver. + pinstack_top_ver is 32 bits; 16 low bits are the index in the + array, to the first element of the list. 16 high bits are a version + (every time the 16 low bits are updated, the 16 high bits are + incremented). Versioniong prevents the ABA problem. + */ top_ver= pinbox->pinstack_top_ver; do { if (!(pins= top_ver % LF_PINBOX_MAX_PINS)) { - pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1; - el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins); + /* the stack of free elements is empty */ + pins= my_atomic_add32(&pinbox->pins_in_array, 1)+1; + if (unlikely(pins >= LF_PINBOX_MAX_PINS)) + return 0; + /* + note that the first allocated element has index 1 (pins==1). + index 0 is reserved to mean "NULL pointer" + */ + el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinarray, pins); + if (unlikely(!el)) + return 0; break; } - el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins); + el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinarray, pins); next= el->link; } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver, top_ver-pins+next+LF_PINBOX_MAX_PINS)); + /* + set el->link to the index of el in the dynarray (el->link has two usages: + - if element is allocated, it's its own index + - if element is free, it's its next element in the free stack + */ el->link= pins; el->purgatory_count= 0; el->pinbox= pinbox; + el->stack_ends_here= stack_end; return el; } @@ -171,25 +228,17 @@ void _lf_pinbox_put_pins(LF_PINS *pins) _lf_pinbox_real_free(pins); if (pins->purgatory_count) { - my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock); + my_atomic_rwlock_wrunlock(&pins->pinbox->pinarray.lock); pthread_yield(); - my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock); + my_atomic_rwlock_wrlock(&pins->pinbox->pinarray.lock); } } top_ver= pinbox->pinstack_top_ver; - if (nr == pinbox->pins_in_stack) - { - int32 tmp= nr; - if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1)) - goto ret; - } - do { pins->link= top_ver % LF_PINBOX_MAX_PINS; } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver, top_ver-pins->link+nr+LF_PINBOX_MAX_PINS)); -ret: return; } @@ -228,7 +277,7 @@ struct st_harvester { /* callback for _lf_dynarray_iterate: - scan all pins or all threads and accumulate all pins + scan all pins of all threads and accumulate all pins */ static int harvest_pins(LF_PINS *el, struct st_harvester *hv) { @@ -243,13 +292,19 @@ static int harvest_pins(LF_PINS *el, struct st_harvester *hv) *hv->granary++= p; } } + /* + hv->npins may become negative below, but it means that + we're on the last dynarray page and harvest_pins() won't be + called again. We don't bother to make hv->npins() correct + (that is 0) in this case. + */ hv->npins-= LF_DYNARRAY_LEVEL_LENGTH; return 0; } /* callback for _lf_dynarray_iterate: - scan all pins or all threads and see if addr is present there + scan all pins of all threads and see if addr is present there */ static int match_pins(LF_PINS *el, void *addr) { @@ -262,28 +317,35 @@ static int match_pins(LF_PINS *el, void *addr) return 0; } +#if STACK_DIRECTION < 0 +#define available_stack_size(END,CUR) (long) ((char*)(CUR) - (char*)(END)) +#else +#define available_stack_size(END,CUR) (long) ((char*)(END) - (char*)(CUR)) +#endif + /* - Scan the purgatory as free everything that can be freed + Scan the purgatory and free everything that can be freed */ static void _lf_pinbox_real_free(LF_PINS *pins) { - int npins; - void *list; - void **addr; + int npins, alloca_size; + void *list, **addr; + struct st_lf_alloc_node *first, *last= NULL; LF_PINBOX *pinbox= pins->pinbox; - npins= pinbox->pins_in_stack+1; + npins= pinbox->pins_in_array+1; #ifdef HAVE_ALLOCA + alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins; /* create a sorted list of pinned addresses, to speed up searches */ - if (sizeof(void *)*LF_PINBOX_PINS*npins < my_thread_stack_size) + if (available_stack_size(&pinbox, pins->stack_ends_here) > alloca_size) { struct st_harvester hv; - addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins); + addr= (void **) alloca(alloca_size); hv.granary= addr; hv.npins= npins; /* scan the dynarray and accumulate all pinned addresses */ - _lf_dynarray_iterate(&pinbox->pinstack, + _lf_dynarray_iterate(&pinbox->pinarray, (lf_dynarray_func)harvest_pins, &hv); npins= hv.granary-addr; @@ -307,7 +369,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) if (addr) /* use binary search */ { void **a, **b, **c; - for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2) + for (a= addr, b= addr+npins-1, c= a+(b-a)/2; (b-a) > 1; c= a+(b-a)/2) if (cur == *c) a= b= c; else if (cur > *c) @@ -319,41 +381,52 @@ static void _lf_pinbox_real_free(LF_PINS *pins) } else /* no alloca - no cookie. linear search here */ { - if (_lf_dynarray_iterate(&pinbox->pinstack, + if (_lf_dynarray_iterate(&pinbox->pinarray, (lf_dynarray_func)match_pins, cur)) goto found; } } /* not pinned - freeing */ - pinbox->free_func(cur, pinbox->free_func_arg); + if (last) + last= last->next= (struct st_lf_alloc_node *)cur; + else + first= last= (struct st_lf_alloc_node *)cur; continue; found: /* pinned - keeping */ add_to_purgatory(pins, cur); } + if (last) + pinbox->free_func(first, last, pinbox->free_func_arg); } +/* lock-free memory allocator for fixed-size objects */ + +LF_REQUIRE_PINS(1); + /* - callback for _lf_pinbox_real_free to free an unpinned object - + callback for _lf_pinbox_real_free to free a list of unpinned objects - add it back to the allocator stack + + DESCRIPTION + 'first' and 'last' are the ends of the linked list of st_lf_alloc_node's: + first->el->el->....->el->last. Use first==last to free only one element. */ -static void alloc_free(struct st_lf_alloc_node *node, LF_ALLOCATOR *allocator) +static void alloc_free(struct st_lf_alloc_node *first, + struct st_lf_alloc_node *last, + LF_ALLOCATOR *allocator) { struct st_lf_alloc_node *tmp; tmp= allocator->top; do { - node->next= tmp; - } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) && + last->next= tmp; + } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, first) && LF_BACKOFF); } -/* lock-free memory allocator for fixed-size objects */ - -LF_REQUIRE_PINS(1); - /* - initialize lock-free allocatod. + initialize lock-free allocator SYNOPSYS allocator - @@ -362,6 +435,8 @@ LF_REQUIRE_PINS(1); memory that is guaranteed to be unused after the object is put in the purgatory. Unused by ANY thread, not only the purgatory owner. + This memory will be used to link waiting-to-be-freed + objects in a purgatory list. */ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) { @@ -370,12 +445,19 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) allocator->top= 0; allocator->mallocs= 0; allocator->element_size= size; - DBUG_ASSERT(size >= (int)sizeof(void *)); - DBUG_ASSERT(free_ptr_offset < size); + DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset); } /* destroy the allocator, free everything that's in it + + NOTE + As every other init/destroy function here and elsewhere it + is not thread safe. No, this function is no different, ensure + that no thread needs the allocator before destroying it. + We are not responsible for any damage that may be caused by + accessing the allocator when it is being or has been destroyed. + Oh yes, and don't put your cat in a microwave. */ void lf_alloc_destroy(LF_ALLOCATOR *allocator) { @@ -410,16 +492,14 @@ void *_lf_alloc_new(LF_PINS *pins) } while (node != allocator->top && LF_BACKOFF); if (!node) { - if (!(node= (void *)my_malloc(allocator->element_size, - MYF(MY_WME|MY_ZEROFILL)))) - break; + node= (void *)my_malloc(allocator->element_size, MYF(MY_WME)); #ifdef MY_LF_EXTRA_DEBUG - my_atomic_add32(&allocator->mallocs, 1); + if (likely(node)) + my_atomic_add32(&allocator->mallocs, 1); #endif break; } - if (my_atomic_casptr((void **)&allocator->top, - (void *)&node, *(void **)node)) + if (my_atomic_casptr((void **)&allocator->top, (void *)&node, node->next)) break; } _lf_unpin(pins, 0); @@ -432,7 +512,7 @@ void *_lf_alloc_new(LF_PINS *pins) NOTE This is NOT thread-safe !!! */ -uint lf_alloc_in_pool(LF_ALLOCATOR *allocator) +uint lf_alloc_pool_count(LF_ALLOCATOR *allocator) { uint i; struct st_lf_alloc_node *node; diff --git a/mysys/lf_dynarray.c b/mysys/lf_dynarray.c index c6dd654bf03..770b1f9342b 100644 --- a/mysys/lf_dynarray.c +++ b/mysys/lf_dynarray.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2000 MySQL AB +/* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,8 +21,6 @@ Memory is allocated in non-contiguous chunks. This data structure is not space efficient for sparse arrays. - The number of elements is limited to 4311810304 - Every element is aligned to sizeof(element) boundary (to avoid false sharing if element is big enough). @@ -32,6 +30,9 @@ to arrays of elements, on the second level it's an array of pointers to arrays of pointers to arrays of elements. And so on. + With four levels the number of elements is limited to 4311810304 + (but as in all functions index is uint, the real limit is 2^32-1) + Actually, it's wait-free, not lock-free ;-) */ @@ -192,6 +193,9 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level, each. _lf_dynarray_iterate() calls user-supplied function on every array from the set. It is the fastest way to scan the array, faster than for (i=0; i < N; i++) { func(_lf_dynarray_value(dynarray, i)); } + + NOTE + if func() returns non-zero, the scan is aborted */ int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg) { diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c index fb2fb88492f..832f0eb5852 100644 --- a/mysys/lf_hash.c +++ b/mysys/lf_hash.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2000 MySQL AB +/* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -36,6 +36,10 @@ typedef struct { uint32 hashnr; /* reversed hash number, for sorting */ const byte *key; uint keylen; + /* + data is stored here, directly after the keylen. + thus the pointer to data is (void*)(slist_element_ptr+1) + */ } LF_SLIST; /* @@ -77,20 +81,20 @@ static int lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr, retry: cursor->prev= (intptr *)head; - do { - cursor->curr= PTR(*cursor->prev); + do { /* PTR() isn't necessary below, head is a dummy node */ + cursor->curr= (LF_SLIST *)(*cursor->prev); _lf_pin(pins, 1, cursor->curr); - } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF); + } while (*cursor->prev != (intptr)cursor->curr && LF_BACKOFF); for (;;) { - if (!cursor->curr) - return 0; + if (unlikely(!cursor->curr)) + return 0; /* end of the list */ do { /* QQ: XXX or goto retry ? */ link= cursor->curr->link; cursor->next= PTR(link); _lf_pin(pins, 0, cursor->next); - } while(link != cursor->curr->link && LF_BACKOFF); + } while (link != cursor->curr->link && LF_BACKOFF); cur_hashnr= cursor->curr->hashnr; cur_key= cursor->curr->key; cur_keylen= cursor->curr->keylen; @@ -114,6 +118,10 @@ retry: } else { + /* + we found a deleted node - be nice, help the other thread + and remove this deleted node + */ if (my_atomic_casptr((void **)cursor->prev, (void **)&cursor->curr, cursor->next)) _lf_alloc_free(pins, cursor->curr); @@ -139,31 +147,44 @@ retry: NOTE it uses pins[0..2], on return all pins are removed. + if there're nodes with the same key value, a new node is added before them. */ static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs, LF_SLIST *node, LF_PINS *pins, uint flags) { CURSOR cursor; - int res= -1; + int res; - do + for (;;) { if (lfind(head, cs, node->hashnr, node->key, node->keylen, &cursor, pins) && (flags & LF_HASH_UNIQUE)) + { res= 0; /* duplicate found */ + break; + } else { node->link= (intptr)cursor.curr; - assert(node->link != (intptr)node); - assert(cursor.prev != &node->link); + DBUG_ASSERT(node->link != (intptr)node); /* no circular references */ + DBUG_ASSERT(cursor.prev != &node->link); /* no circular references */ if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node)) + { res= 1; /* inserted ok */ + break; + } } - } while (res == -1); + } _lf_unpin(pins, 0); _lf_unpin(pins, 1); _lf_unpin(pins, 2); + /* + Note that cursor.curr is not pinned here and the pointer is unreliable, + the object may dissapear anytime. But if it points to a dummy node, the + pointer is safe, because dummy nodes are never freed - initialize_bucket() + uses this fact. + */ return res ? 0 : cursor.curr; } @@ -183,24 +204,41 @@ static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr, const byte *key, uint keylen, LF_PINS *pins) { CURSOR cursor; - int res= -1; + int res; - do + for (;;) { if (!lfind(head, cs, hashnr, key, keylen, &cursor, pins)) - res= 1; + { + res= 1; /* not found */ + break; + } else + { + /* mark the node deleted */ if (my_atomic_casptr((void **)&(cursor.curr->link), - (void **)&cursor.next, 1+(char *)cursor.next)) + (void **)&cursor.next, + (void *)(((intptr)cursor.next) | 1))) { + /* and remove it from the list */ if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, cursor.next)) _lf_alloc_free(pins, cursor.curr); else + { + /* + somebody already "helped" us and removed the node ? + Let's check if we need to help that someone too! + (to ensure the number of "set DELETED flag" actions + is equal to the number of "remove from the list" actions) + */ lfind(head, cs, hashnr, key, keylen, &cursor, pins); + } res= 0; + break; } - } while (res == -1); + } + } _lf_unpin(pins, 0); _lf_unpin(pins, 1); _lf_unpin(pins, 2); @@ -226,7 +264,8 @@ static LF_SLIST *lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs, { CURSOR cursor; int res= lfind(head, cs, hashnr, key, keylen, &cursor, pins); - if (res) _lf_pin(pins, 2, cursor.curr); + if (res) + _lf_pin(pins, 2, cursor.curr); _lf_unpin(pins, 0); _lf_unpin(pins, 1); return res ? cursor.curr : 0; @@ -241,6 +280,11 @@ static inline const byte* hash_key(const LF_HASH *hash, return record + hash->key_offset; } +/* + compute the hash key value from the raw key. + note, that the hash value is limited to 2^31, because we need one + bit to distinguish between normal and dummy nodes. +*/ static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen) { ulong nr1= 1, nr2= 4; @@ -249,8 +293,9 @@ static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen) return nr1 & INT_MAX32; } -#define MAX_LOAD 1.0 -static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *); +#define MAX_LOAD 1.0 /* average number of elements in a bucket */ + +static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *); /* Initializes lf_hash, the arguments are compatible with hash_init @@ -261,7 +306,7 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, { lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size, offsetof(LF_SLIST, key)); - lf_dynarray_init(&hash->array, sizeof(LF_SLIST **)); + lf_dynarray_init(&hash->array, sizeof(LF_SLIST *)); hash->size= 1; hash->count= 0; hash->element_size= element_size; @@ -275,14 +320,19 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, void lf_hash_destroy(LF_HASH *hash) { - LF_SLIST *el= *(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0); + LF_SLIST *el, **head= (LF_SLIST **)_lf_dynarray_value(&hash->array, 0); + + if (unlikely(!head)) + return; + el= *head; + while (el) { intptr next= el->link; if (el->hashnr & 1) - lf_alloc_real_free(&hash->alloc, el); + lf_alloc_direct_free(&hash->alloc, el); /* normal node */ else - my_free((void *)el, MYF(0)); + my_free((void *)el, MYF(0)); /* dummy node */ el= (LF_SLIST *)next; } lf_alloc_destroy(&hash->alloc); @@ -297,6 +347,7 @@ void lf_hash_destroy(LF_HASH *hash) RETURN 0 - inserted 1 - didn't (unique key conflict) + -1 - out of memory NOTE see linsert() for pin usage notes @@ -308,14 +359,18 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data) lf_rwlock_by_pins(pins); node= (LF_SLIST *)_lf_alloc_new(pins); + if (unlikely(!node)) + return -1; memcpy(node+1, data, hash->element_size); node->key= hash_key(hash, (byte *)(node+1), &node->keylen); hashnr= calc_hash(hash, node->key, node->keylen); bucket= hashnr % hash->size; el= _lf_dynarray_lvalue(&hash->array, bucket); - if (*el == NULL) - initialize_bucket(hash, el, bucket, pins); - node->hashnr= my_reverse_bits(hashnr) | 1; + if (unlikely(!el)) + return -1; + if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins))) + return -1; + node->hashnr= my_reverse_bits(hashnr) | 1; /* normal node */ if (linsert(el, hash->charset, node, pins, hash->flags)) { _lf_alloc_free(pins, node); @@ -330,9 +385,14 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data) } /* + DESCRIPTION + deletes an element with the given key from the hash (if a hash is + not unique and there're many elements with this key - the "first" + matching element is deleted) RETURN 0 - deleted 1 - didn't (not found) + -1 - out of memory NOTE see ldelete() for pin usage notes */ @@ -344,8 +404,16 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) bucket= hashnr % hash->size; lf_rwlock_by_pins(pins); el= _lf_dynarray_lvalue(&hash->array, bucket); - if (*el == NULL) - initialize_bucket(hash, el, bucket, pins); + if (unlikely(!el)) + return -1; + /* + note that we still need to initialize_bucket here, + we cannot return "node not found", because an old bucket of that + node may've been split and the node was assigned to a new bucket + that was never accessed before and thus is not initialized. + */ + if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins))) + return -1; if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1, (byte *)key, keylen, pins)) { @@ -358,6 +426,12 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) } /* + RETURN + a pointer to an element with the given key (if a hash is not unique and + there're many elements with this key - the "first" matching element) + NULL if nothing is found + MY_ERRPTR if OOM + NOTE see lsearch() for pin usage notes */ @@ -369,32 +443,51 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) bucket= hashnr % hash->size; lf_rwlock_by_pins(pins); el= _lf_dynarray_lvalue(&hash->array, bucket); - if (*el == NULL) - initialize_bucket(hash, el, bucket, pins); + if (unlikely(!el)) + return MY_ERRPTR; + if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins))) + return MY_ERRPTR; found= lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1, (byte *)key, keylen, pins); lf_rwunlock_by_pins(pins); return found ? found+1 : 0; } -static const char *dummy_key= ""; +static const byte *dummy_key= ""; -static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, +/* + RETURN + 0 - ok + -1 - out of memory +*/ +static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, uint bucket, LF_PINS *pins) { uint parent= my_clear_highest_bit(bucket); LF_SLIST *dummy= (LF_SLIST *)my_malloc(sizeof(LF_SLIST), MYF(MY_WME)); LF_SLIST **tmp= 0, *cur; LF_SLIST * volatile *el= _lf_dynarray_lvalue(&hash->array, parent); - if (*el == NULL && bucket) - initialize_bucket(hash, el, parent, pins); - dummy->hashnr= my_reverse_bits(bucket); + if (unlikely(!el || !dummy)) + return -1; + if (*el == NULL && bucket && + unlikely(initialize_bucket(hash, el, parent, pins))) + return -1; + dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */ dummy->key= (char*) dummy_key; dummy->keylen= 0; - if ((cur= linsert(el, hash->charset, dummy, pins, 0))) + if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE))) { my_free((void *)dummy, MYF(0)); dummy= cur; } my_atomic_casptr((void **)node, (void **)&tmp, dummy); + /* + note that if the CAS above failed (after linsert() succeeded), + it would mean that some other thread has executed linsert() for + the same dummy node, its linsert() failed, it picked up our + dummy node (in "dummy= cur") and executed the same CAS as above. + Which means that even if CAS above failed we don't need to retry, + and we should not free(dummy) - there's no memory leak here + */ + return 0; } |