summaryrefslogtreecommitdiff
path: root/mysys
diff options
context:
space:
mode:
authorunknown <guilhem@gbichot3.local>2007-02-28 17:50:51 +0100
committerunknown <guilhem@gbichot3.local>2007-02-28 17:50:51 +0100
commitea57b3d4a066a5507a7e322b53e3acab24a2855e (patch)
treea0703039f00da454a2a91c8b14835d45a6146209 /mysys
parentae72e394502e13b854b6e9bb00889fa6b69a7ed9 (diff)
parentfdf847fb62a0fcdf0edf25d6c8654b19eaa9a9ad (diff)
downloadmariadb-git-ea57b3d4a066a5507a7e322b53e3acab24a2855e.tar.gz
Merge gbichot3.local:/home/mysql_src/mysql-5.1-for-maria
into gbichot3.local:/home/mysql_src/mysql-maria BitKeeper/etc/ignore: auto-union BUILD/SETUP.sh: Auto merged client/mysqldump.c: Auto merged config/ac-macros/plugins.m4: Auto merged configure.in: Auto merged include/Makefile.am: Auto merged include/atomic/nolock.h: Auto merged include/atomic/rwlock.h: Auto merged include/atomic/x86-gcc.h: Auto merged include/atomic/x86-msvc.h: Auto merged include/ft_global.h: Auto merged include/keycache.h: Auto merged include/m_string.h: Auto merged include/my_atomic.h: Auto merged include/my_base.h: Auto merged include/my_dbug.h: Auto merged include/my_global.h: Auto merged include/my_handler.h: Auto merged include/my_sys.h: Auto merged include/myisam.h: Auto merged libmysql/CMakeLists.txt: Auto merged libmysqld/Makefile.am: Auto merged mysql-test/mysql-test-run.pl: Auto merged mysql-test/r/events_logs_tests.result: Auto merged mysql-test/t/events_logs_tests.test: Auto merged mysys/Makefile.am: Auto merged mysys/array.c: Auto merged mysys/mf_keycache.c: Auto merged mysys/mf_keycaches.c: Auto merged mysys/my_atomic.c: Auto merged mysys/my_bit.c: Auto merged mysys/my_bitmap.c: Auto merged mysys/my_create.c: Auto merged mysys/my_delete.c: Auto merged mysys/my_getsystime.c: Auto merged mysys/my_handler.c: Auto merged mysys/my_init.c: Auto merged mysys/my_open.c: Auto merged mysys/my_pread.c: Auto merged mysys/my_rename.c: Auto merged mysys/my_symlink.c: Auto merged mysys/my_sync.c: Auto merged plugin/daemon_example/daemon_example.cc: Auto merged sql/Makefile.am: Auto merged sql/filesort.cc: Auto merged sql/gen_lex_hash.cc: Auto merged sql/ha_ndbcluster.cc: Auto merged sql/handler.h: Auto merged sql/item_func.cc: Auto merged sql/item_func.h: Auto merged sql/log.cc: Auto merged sql/mysql_priv.h: Auto merged sql/set_var.h: Auto merged sql/sql_class.h: Auto merged sql/sql_parse.cc: Auto merged sql/sql_select.cc: Auto merged sql/sql_sort.h: Auto merged sql/sql_test.cc: Auto merged sql/uniques.cc: Auto merged sql/unireg.cc: Auto merged storage/Makefile.am: Auto merged storage/csv/ha_tina.cc: Auto merged storage/myisam/Makefile.am: Auto merged storage/myisam/ft_boolean_search.c: Auto merged storage/myisam/ft_nlq_search.c: Auto merged storage/myisam/ft_parser.c: Auto merged storage/myisam/ft_static.c: Auto merged storage/myisam/ft_stopwords.c: Auto merged storage/myisam/ft_update.c: Auto merged storage/myisam/fulltext.h: Auto merged storage/myisam/ha_myisam.h: Auto merged storage/myisam/mi_check.c: Auto merged storage/myisam/mi_create.c: Auto merged storage/myisam/mi_delete.c: Auto merged storage/myisam/mi_delete_all.c: Auto merged storage/myisam/mi_dynrec.c: Auto merged storage/myisam/mi_key.c: Auto merged storage/myisam/mi_log.c: Auto merged storage/myisam/mi_open.c: Auto merged storage/myisam/mi_packrec.c: Auto merged storage/myisam/mi_range.c: Auto merged storage/myisam/mi_rsamepos.c: Auto merged storage/myisam/mi_search.c: Auto merged storage/myisam/mi_test1.c: Auto merged storage/myisam/mi_test2.c: Auto merged storage/myisam/mi_unique.c: Auto merged storage/myisam/mi_update.c: Auto merged storage/myisam/mi_write.c: Auto merged storage/myisam/myisamchk.c: Auto merged storage/myisam/myisamlog.c: Auto merged storage/myisam/myisampack.c: Auto merged storage/myisam/rt_index.c: Auto merged storage/myisam/sort.c: Auto merged storage/myisammrg/ha_myisammrg.h: Auto merged unittest/mytap/tap.c: Auto merged mysql-test/r/view.result: manual merge mysql-test/t/view.test: manual merge Makefile.am: manual merge mysql-test/t/disabled.def: manual merge sql/mysqld.cc: manual merge sql/set_var.cc: manual merge sql/udf_example.c: manual merge storage/myisam/ha_myisam.cc: manual merge storage/myisam/myisamdef.h: manual merge storage/ndb/src/mgmapi/mgmapi.cpp: manual merge unittest/Makefile.am: manual merge unittest/mysys/Makefile.am: manual merge unittest/mysys/my_atomic-t.c: manual merge
Diffstat (limited to 'mysys')
-rw-r--r--mysys/Makefile.am7
-rw-r--r--mysys/array.c56
-rw-r--r--mysys/lf_alloc-pin.c443
-rw-r--r--mysys/lf_dynarray.c204
-rw-r--r--mysys/lf_hash.c400
-rw-r--r--mysys/mf_keycache.c24
-rw-r--r--mysys/mf_keycaches.c14
-rwxr-xr-xmysys/mf_pagecache.c4102
-rw-r--r--mysys/my_atomic.c9
-rw-r--r--mysys/my_bit.c100
-rw-r--r--mysys/my_bitmap.c1
-rw-r--r--mysys/my_create.c7
-rw-r--r--mysys/my_delete.c3
-rw-r--r--mysys/my_getsystime.c4
-rw-r--r--mysys/my_handler.c23
-rw-r--r--mysys/my_init.c1
-rw-r--r--mysys/my_open.c1
-rw-r--r--mysys/my_pread.c9
-rw-r--r--mysys/my_rename.c16
-rw-r--r--mysys/my_symlink.c2
-rw-r--r--mysys/my_sync.c79
-rw-r--r--mysys/wqueue.c167
22 files changed, 5551 insertions, 121 deletions
diff --git a/mysys/Makefile.am b/mysys/Makefile.am
index fd6a0f76332..b02e0f84ec1 100644
--- a/mysys/Makefile.am
+++ b/mysys/Makefile.am
@@ -30,7 +30,8 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \
mf_tempdir.c my_lock.c mf_brkhant.c my_alarm.c \
my_malloc.c my_realloc.c my_once.c mulalloc.c \
my_alloc.c safemalloc.c my_new.cc \
- my_vle.c my_atomic.c \
+ my_vle.c my_atomic.c lf_hash.c \
+ lf_dynarray.c lf_alloc-pin.c \
my_fopen.c my_fstream.c my_getsystime.c \
my_error.c errors.c my_div.c my_messnc.c \
mf_format.c mf_same.c mf_dirname.c mf_fn_ext.c \
@@ -52,7 +53,8 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \
my_gethostbyname.c rijndael.c my_aes.c sha1.c \
my_handler.c my_netware.c my_largepage.c \
my_memmem.c \
- my_windac.c my_access.c base64.c my_libwrap.c
+ my_windac.c my_access.c base64.c my_libwrap.c \
+ mf_pagecache.c wqueue.c
EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \
thr_mutex.c thr_rwlock.c \
CMakeLists.txt mf_soundex.c \
@@ -126,5 +128,6 @@ test_base64$(EXEEXT): base64.c $(LIBRARIES)
$(LINK) $(FLAGS) -DMAIN ./test_base64.c $(LDADD) $(LIBS)
$(RM) -f ./test_base64.c
+
# Don't update the files from bitkeeper
%::SCCS/s.%
diff --git a/mysys/array.c b/mysys/array.c
index 8f4a6087c00..7f909999fe9 100644
--- a/mysys/array.c
+++ b/mysys/array.c
@@ -60,7 +60,8 @@ my_bool init_dynamic_array(DYNAMIC_ARRAY *array, uint element_size,
array->max_element=init_alloc;
array->alloc_increment=alloc_increment;
array->size_of_element=element_size;
- if (!(array->buffer=(char*) my_malloc_ci(element_size*init_alloc,MYF(MY_WME))))
+ if (!(array->buffer=(char*) my_malloc_ci(element_size*init_alloc,
+ MYF(MY_WME))))
{
array->max_element=0;
DBUG_RETURN(TRUE);
@@ -153,7 +154,7 @@ byte *pop_dynamic(DYNAMIC_ARRAY *array)
}
/*
- Replace elemnent in array with given element and index
+ Replace element in array with given element and index
SYNOPSIS
set_dynamic()
@@ -174,19 +175,8 @@ my_bool set_dynamic(DYNAMIC_ARRAY *array, gptr element, uint idx)
{
if (idx >= array->elements)
{
- if (idx >= array->max_element)
- {
- uint size;
- char *new_ptr;
- size=(idx+array->alloc_increment)/array->alloc_increment;
- size*= array->alloc_increment;
- if (!(new_ptr=(char*) my_realloc(array->buffer,size*
- array->size_of_element,
- MYF(MY_WME | MY_ALLOW_ZERO_PTR))))
- return TRUE;
- array->buffer=new_ptr;
- array->max_element=size;
- }
+ if (idx >= array->max_element && allocate_dynamic(array, idx))
+ return TRUE;
bzero((gptr) (array->buffer+array->elements*array->size_of_element),
(idx - array->elements)*array->size_of_element);
array->elements=idx+1;
@@ -196,6 +186,42 @@ my_bool set_dynamic(DYNAMIC_ARRAY *array, gptr element, uint idx)
return FALSE;
}
+
+/*
+ Ensure that dynamic array has enough elements
+
+ SYNOPSIS
+ allocate_dynamic()
+ array
+ max_elements Numbers of elements that is needed
+
+ NOTES
+ Any new allocated element are NOT initialized
+
+ RETURN VALUE
+ FALSE Ok
+ TRUE Allocation of new memory failed
+*/
+
+my_bool allocate_dynamic(DYNAMIC_ARRAY *array, uint max_elements)
+{
+ if (max_elements >= array->max_element)
+ {
+ uint size;
+ char *new_ptr;
+ size= (max_elements + array->alloc_increment)/array->alloc_increment;
+ size*= array->alloc_increment;
+ if (!(new_ptr= (char*) my_realloc(array->buffer,size*
+ array->size_of_element,
+ MYF(MY_WME | MY_ALLOW_ZERO_PTR))))
+ return TRUE;
+ array->buffer= new_ptr;
+ array->max_element= size;
+ }
+ return FALSE;
+}
+
+
/*
Get an element from array by given index
diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c
new file mode 100644
index 00000000000..e964553a64c
--- /dev/null
+++ b/mysys/lf_alloc-pin.c
@@ -0,0 +1,443 @@
+/* QQ: TODO multi-pinbox */
+/* Copyright (C) 2000 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ wait-free concurrent allocator based on pinning addresses
+
+ It works as follows: every thread (strictly speaking - every CPU, but
+ it's too difficult to do) has a small array of pointers. They're called
+ "pins". Before using an object its address must be stored in this array
+ (pinned). When an object is no longer necessary its address must be
+ removed from this array (unpinned). When a thread wants to free() an
+ object it scans all pins of all threads to see if somebody has this
+ object pinned. If yes - the object is not freed (but stored in a
+ "purgatory"). To reduce the cost of a single free() pins are not scanned
+ on every free() but only added to (thread-local) purgatory. On every
+ LF_PURGATORY_SIZE free() purgatory is scanned and all unpinned objects
+ are freed.
+
+ Pins are used to solve ABA problem. To use pins one must obey
+ a pinning protocol:
+ 1. Let's assume that PTR is a shared pointer to an object. Shared means
+ that any thread may modify it anytime to point to a different object
+ and free the old object. Later the freed object may be potentially
+ allocated by another thread. If we're unlucky that another thread may
+ set PTR to point to this object again. This is ABA problem.
+ 2. Create a local pointer LOCAL_PTR.
+ 3. Pin the PTR in a loop:
+ do
+ {
+ LOCAL_PTR= PTR;
+ pin(PTR, PIN_NUMBER);
+ } while (LOCAL_PTR != PTR)
+ 4. It is guaranteed that after the loop has ended, LOCAL_PTR
+ points to an object (or NULL, if PTR may be NULL), that
+ will never be freed. It is not guaranteed though
+ that LOCAL_PTR == PTR (as PTR can change any time)
+ 5. When done working with the object, remove the pin:
+ unpin(PIN_NUMBER)
+ 6. When copying pins (as in the list traversing loop:
+ pin(CUR, 1);
+ while ()
+ {
+ do // standard
+ { // pinning
+ NEXT=CUR->next; // loop
+ pin(NEXT, 0); // see #3
+ } while (NEXT != CUR->next); // above
+ ...
+ ...
+ CUR=NEXT;
+ pin(CUR, 1); // copy pin[0] to pin[1]
+ }
+ which keeps CUR address constantly pinned), note than pins may be
+ copied only upwards (!!!), that is pin[N] to pin[M], M > N.
+ 7. Don't keep the object pinned longer than necessary - the number of
+ pins you have is limited (and small), keeping an object pinned
+ prevents its reuse and cause unnecessary mallocs.
+
+ Implementation details:
+ Pins are given away from a "pinbox". Pinbox is stack-based allocator.
+ It used dynarray for storing pins, new elements are allocated by dynarray
+ as necessary, old are pushed in the stack for reuse. ABA is solved by
+ versioning a pointer - because we use an array, a pointer to pins is 32 bit,
+ upper 32 bits are used for a version.
+*/
+
+#include <my_global.h>
+#include <my_sys.h>
+#include <lf.h>
+
+#define LF_PINBOX_MAX_PINS 65536
+
+static void _lf_pinbox_real_free(LF_PINS *pins);
+
+/*
+ Initialize a pinbox. Normally called from lf_alloc_init.
+ See the latter for details.
+*/
+void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
+ lf_pinbox_free_func *free_func, void *free_func_arg)
+{
+ DBUG_ASSERT(sizeof(LF_PINS) == 128);
+ DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0);
+ lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS));
+ pinbox->pinstack_top_ver= 0;
+ pinbox->pins_in_stack= 0;
+ pinbox->free_ptr_offset= free_ptr_offset;
+ pinbox->free_func= free_func;
+ pinbox->free_func_arg= free_func_arg;
+}
+
+void lf_pinbox_destroy(LF_PINBOX *pinbox)
+{
+ lf_dynarray_destroy(&pinbox->pinstack);
+}
+
+/*
+ Get pins from a pinbox. Usually called via lf_alloc_get_pins() or
+ lf_hash_get_pins().
+
+ DESCRIPTION
+ get a new LF_PINS structure from a stack of unused pins,
+ or allocate a new one out of dynarray.
+*/
+LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
+{
+ uint32 pins, next, top_ver;
+ LF_PINS *el;
+
+ top_ver= pinbox->pinstack_top_ver;
+ do
+ {
+ if (!(pins= top_ver % LF_PINBOX_MAX_PINS))
+ {
+ pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1;
+ el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins);
+ break;
+ }
+ el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins);
+ next= el->link;
+ } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
+ top_ver-pins+next+LF_PINBOX_MAX_PINS));
+ el->link= pins;
+ el->purgatory_count= 0;
+ el->pinbox= pinbox;
+ return el;
+}
+
+/*
+ Put pins back to a pinbox. Usually called via lf_alloc_put_pins() or
+ lf_hash_put_pins().
+
+ DESCRIPTION
+ empty the purgatory (XXX deadlock warning below!),
+ push LF_PINS structure to a stack
+*/
+void _lf_pinbox_put_pins(LF_PINS *pins)
+{
+ LF_PINBOX *pinbox= pins->pinbox;
+ uint32 top_ver, nr;
+ nr= pins->link;
+#ifdef MY_LF_EXTRA_DEBUG
+ {
+ int i;
+ for (i= 0; i < LF_PINBOX_PINS; i++)
+ DBUG_ASSERT(pins->pin[i] == 0);
+ }
+#endif
+ /*
+ XXX this will deadlock if other threads will wait for
+ the caller to do something after _lf_pinbox_put_pins(),
+ and they would have pinned addresses that the caller wants to free.
+ Thus: only free pins when all work is done and nobody can wait for you!!!
+ */
+ while (pins->purgatory_count)
+ {
+ _lf_pinbox_real_free(pins);
+ if (pins->purgatory_count)
+ {
+ my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock);
+ pthread_yield();
+ my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock);
+ }
+ }
+ top_ver= pinbox->pinstack_top_ver;
+ if (nr == pinbox->pins_in_stack)
+ {
+ int32 tmp= nr;
+ if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1))
+ goto ret;
+ }
+
+ do
+ {
+ pins->link= top_ver % LF_PINBOX_MAX_PINS;
+ } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
+ top_ver-pins->link+nr+LF_PINBOX_MAX_PINS));
+ret:
+ return;
+}
+
+static int ptr_cmp(void **a, void **b)
+{
+ return *a < *b ? -1 : *a == *b ? 0 : 1;
+}
+
+#define add_to_purgatory(PINS, ADDR) \
+ do \
+ { \
+ *(void **)((char *)(ADDR)+(PINS)->pinbox->free_ptr_offset)= \
+ (PINS)->purgatory; \
+ (PINS)->purgatory= (ADDR); \
+ (PINS)->purgatory_count++; \
+ } while (0)
+
+/*
+ Free an object allocated via pinbox allocator
+
+ DESCRIPTION
+ add an object to purgatory. if necessary, call _lf_pinbox_real_free()
+ to actually free something.
+*/
+void _lf_pinbox_free(LF_PINS *pins, void *addr)
+{
+ add_to_purgatory(pins, addr);
+ if (pins->purgatory_count % LF_PURGATORY_SIZE)
+ _lf_pinbox_real_free(pins);
+}
+
+struct st_harvester {
+ void **granary;
+ int npins;
+};
+
+/*
+ callback for _lf_dynarray_iterate:
+ scan all pins or all threads and accumulate all pins
+*/
+static int harvest_pins(LF_PINS *el, struct st_harvester *hv)
+{
+ int i;
+ LF_PINS *el_end= el+min(hv->npins, LF_DYNARRAY_LEVEL_LENGTH);
+ for (; el < el_end; el++)
+ {
+ for (i= 0; i < LF_PINBOX_PINS; i++)
+ {
+ void *p= el->pin[i];
+ if (p)
+ *hv->granary++= p;
+ }
+ }
+ hv->npins-= LF_DYNARRAY_LEVEL_LENGTH;
+ return 0;
+}
+
+/*
+ callback for _lf_dynarray_iterate:
+ scan all pins or all threads and see if addr is present there
+*/
+static int match_pins(LF_PINS *el, void *addr)
+{
+ int i;
+ LF_PINS *el_end= el+LF_DYNARRAY_LEVEL_LENGTH;
+ for (; el < el_end; el++)
+ for (i= 0; i < LF_PINBOX_PINS; i++)
+ if (el->pin[i] == addr)
+ return 1;
+ return 0;
+}
+
+/*
+ Scan the purgatory as free everything that can be freed
+*/
+static void _lf_pinbox_real_free(LF_PINS *pins)
+{
+ int npins;
+ void *list;
+ void **addr;
+ LF_PINBOX *pinbox= pins->pinbox;
+
+ npins= pinbox->pins_in_stack+1;
+
+#ifdef HAVE_ALLOCA
+ /* create a sorted list of pinned addresses, to speed up searches */
+ if (sizeof(void *)*LF_PINBOX_PINS*npins < my_thread_stack_size)
+ {
+ struct st_harvester hv;
+ addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins);
+ hv.granary= addr;
+ hv.npins= npins;
+ /* scan the dynarray and accumulate all pinned addresses */
+ _lf_dynarray_iterate(&pinbox->pinstack,
+ (lf_dynarray_func)harvest_pins, &hv);
+
+ npins= hv.granary-addr;
+ /* and sort them */
+ if (npins)
+ qsort(addr, npins, sizeof(void *), (qsort_cmp)ptr_cmp);
+ }
+ else
+#endif
+ addr= 0;
+
+ list= pins->purgatory;
+ pins->purgatory= 0;
+ pins->purgatory_count= 0;
+ while (list)
+ {
+ void *cur= list;
+ list= *(void **)((char *)cur+pinbox->free_ptr_offset);
+ if (npins)
+ {
+ if (addr) /* use binary search */
+ {
+ void **a, **b, **c;
+ for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2)
+ if (cur == *c)
+ a= b= c;
+ else if (cur > *c)
+ a= c;
+ else
+ b= c;
+ if (cur == *a || cur == *b)
+ goto found;
+ }
+ else /* no alloca - no cookie. linear search here */
+ {
+ if (_lf_dynarray_iterate(&pinbox->pinstack,
+ (lf_dynarray_func)match_pins, cur))
+ goto found;
+ }
+ }
+ /* not pinned - freeing */
+ pinbox->free_func(cur, pinbox->free_func_arg);
+ continue;
+found:
+ /* pinned - keeping */
+ add_to_purgatory(pins, cur);
+ }
+}
+
+/*
+ callback for _lf_pinbox_real_free to free an unpinned object -
+ add it back to the allocator stack
+*/
+static void alloc_free(struct st_lf_alloc_node *node, LF_ALLOCATOR *allocator)
+{
+ struct st_lf_alloc_node *tmp;
+ tmp= allocator->top;
+ do
+ {
+ node->next= tmp;
+ } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) &&
+ LF_BACKOFF);
+}
+
+/* lock-free memory allocator for fixed-size objects */
+
+LF_REQUIRE_PINS(1);
+
+/*
+ initialize lock-free allocatod.
+
+ SYNOPSYS
+ allocator -
+ size a size of an object to allocate
+ free_ptr_offset an offset inside the object to a sizeof(void *)
+ memory that is guaranteed to be unused after
+ the object is put in the purgatory. Unused by ANY
+ thread, not only the purgatory owner.
+*/
+void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
+{
+ lf_pinbox_init(&allocator->pinbox, free_ptr_offset,
+ (lf_pinbox_free_func *)alloc_free, allocator);
+ allocator->top= 0;
+ allocator->mallocs= 0;
+ allocator->element_size= size;
+ DBUG_ASSERT(size >= (int)sizeof(void *));
+ DBUG_ASSERT(free_ptr_offset < size);
+}
+
+/*
+ destroy the allocator, free everything that's in it
+*/
+void lf_alloc_destroy(LF_ALLOCATOR *allocator)
+{
+ struct st_lf_alloc_node *node= allocator->top;
+ while (node)
+ {
+ struct st_lf_alloc_node *tmp= node->next;
+ my_free((void *)node, MYF(0));
+ node= tmp;
+ }
+ lf_pinbox_destroy(&allocator->pinbox);
+ allocator->top= 0;
+}
+
+/*
+ Allocate and return an new object.
+
+ DESCRIPTION
+ Pop an unused object from the stack or malloc it is the stack is empty.
+ pin[0] is used, it's removed on return.
+*/
+void *_lf_alloc_new(LF_PINS *pins)
+{
+ LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
+ struct st_lf_alloc_node *node;
+ for (;;)
+ {
+ do
+ {
+ node= allocator->top;
+ _lf_pin(pins, 0, node);
+ } while (node != allocator->top && LF_BACKOFF);
+ if (!node)
+ {
+ if (!(node= (void *)my_malloc(allocator->element_size,
+ MYF(MY_WME|MY_ZEROFILL))))
+ break;
+#ifdef MY_LF_EXTRA_DEBUG
+ my_atomic_add32(&allocator->mallocs, 1);
+#endif
+ break;
+ }
+ if (my_atomic_casptr((void **)&allocator->top,
+ (void *)&node, *(void **)node))
+ break;
+ }
+ _lf_unpin(pins, 0);
+ return node;
+}
+
+/*
+ count the number of objects in a pool.
+
+ NOTE
+ This is NOT thread-safe !!!
+*/
+uint lf_alloc_in_pool(LF_ALLOCATOR *allocator)
+{
+ uint i;
+ struct st_lf_alloc_node *node;
+ for (node= allocator->top, i= 0; node; node= node->next, i++)
+ /* no op */;
+ return i;
+}
+
diff --git a/mysys/lf_dynarray.c b/mysys/lf_dynarray.c
new file mode 100644
index 00000000000..c6dd654bf03
--- /dev/null
+++ b/mysys/lf_dynarray.c
@@ -0,0 +1,204 @@
+/* Copyright (C) 2000 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ Analog of DYNAMIC_ARRAY that never reallocs
+ (so no pointer into the array may ever become invalid).
+
+ Memory is allocated in non-contiguous chunks.
+ This data structure is not space efficient for sparse arrays.
+
+ The number of elements is limited to 4311810304
+
+ Every element is aligned to sizeof(element) boundary
+ (to avoid false sharing if element is big enough).
+
+ LF_DYNARRAY is a recursive structure. On the zero level
+ LF_DYNARRAY::level[0] it's an array of LF_DYNARRAY_LEVEL_LENGTH elements,
+ on the first level it's an array of LF_DYNARRAY_LEVEL_LENGTH pointers
+ to arrays of elements, on the second level it's an array of pointers
+ to arrays of pointers to arrays of elements. And so on.
+
+ Actually, it's wait-free, not lock-free ;-)
+*/
+
+#include <my_global.h>
+#include <strings.h>
+#include <my_sys.h>
+#include <lf.h>
+
+void lf_dynarray_init(LF_DYNARRAY *array, uint element_size)
+{
+ bzero(array, sizeof(*array));
+ array->size_of_element= element_size;
+ my_atomic_rwlock_init(&array->lock);
+}
+
+static void recursive_free(void **alloc, int level)
+{
+ if (!alloc)
+ return;
+
+ if (level)
+ {
+ int i;
+ for (i= 0; i < LF_DYNARRAY_LEVEL_LENGTH; i++)
+ recursive_free(alloc[i], level-1);
+ my_free((void *)alloc, MYF(0));
+ }
+ else
+ my_free(alloc[-1], MYF(0));
+}
+
+void lf_dynarray_destroy(LF_DYNARRAY *array)
+{
+ int i;
+ for (i= 0; i < LF_DYNARRAY_LEVELS; i++)
+ recursive_free(array->level[i], i);
+ my_atomic_rwlock_destroy(&array->lock);
+}
+
+static const ulong dynarray_idxes_in_prev_levels[LF_DYNARRAY_LEVELS]=
+{
+ 0, /* +1 here to to avoid -1's below */
+ LF_DYNARRAY_LEVEL_LENGTH,
+ LF_DYNARRAY_LEVEL_LENGTH * LF_DYNARRAY_LEVEL_LENGTH +
+ LF_DYNARRAY_LEVEL_LENGTH,
+ LF_DYNARRAY_LEVEL_LENGTH * LF_DYNARRAY_LEVEL_LENGTH *
+ LF_DYNARRAY_LEVEL_LENGTH + LF_DYNARRAY_LEVEL_LENGTH *
+ LF_DYNARRAY_LEVEL_LENGTH + LF_DYNARRAY_LEVEL_LENGTH
+};
+
+static const ulong dynarray_idxes_in_prev_level[LF_DYNARRAY_LEVELS]=
+{
+ 0, /* +1 here to to avoid -1's below */
+ LF_DYNARRAY_LEVEL_LENGTH,
+ LF_DYNARRAY_LEVEL_LENGTH * LF_DYNARRAY_LEVEL_LENGTH,
+ LF_DYNARRAY_LEVEL_LENGTH * LF_DYNARRAY_LEVEL_LENGTH *
+ LF_DYNARRAY_LEVEL_LENGTH,
+};
+
+/*
+ Returns a valid lvalue pointer to the element number 'idx'.
+ Allocates memory if necessary.
+*/
+void *_lf_dynarray_lvalue(LF_DYNARRAY *array, uint idx)
+{
+ void * ptr, * volatile * ptr_ptr= 0;
+ int i;
+
+ for (i= LF_DYNARRAY_LEVELS-1; idx < dynarray_idxes_in_prev_levels[i]; i--)
+ /* no-op */;
+ ptr_ptr= &array->level[i];
+ idx-= dynarray_idxes_in_prev_levels[i];
+ for (; i > 0; i--)
+ {
+ if (!(ptr= *ptr_ptr))
+ {
+ void *alloc= my_malloc(LF_DYNARRAY_LEVEL_LENGTH * sizeof(void *),
+ MYF(MY_WME|MY_ZEROFILL));
+ if (unlikely(!alloc))
+ return(NULL);
+ if (my_atomic_casptr(ptr_ptr, &ptr, alloc))
+ ptr= alloc;
+ else
+ my_free(alloc, MYF(0));
+ }
+ ptr_ptr= ((void **)ptr) + idx / dynarray_idxes_in_prev_level[i];
+ idx%= dynarray_idxes_in_prev_level[i];
+ }
+ if (!(ptr= *ptr_ptr))
+ {
+ void *alloc, *data;
+ alloc= my_malloc(LF_DYNARRAY_LEVEL_LENGTH * array->size_of_element +
+ max(array->size_of_element, sizeof(void *)),
+ MYF(MY_WME|MY_ZEROFILL));
+ if (unlikely(!alloc))
+ return(NULL);
+ /* reserve the space for free() address */
+ data= alloc + sizeof(void *);
+ { /* alignment */
+ intptr mod= ((intptr)data) % array->size_of_element;
+ if (mod)
+ data+= array->size_of_element - mod;
+ }
+ ((void **)data)[-1]= alloc; /* free() will need the original pointer */
+ if (my_atomic_casptr(ptr_ptr, &ptr, data))
+ ptr= data;
+ else
+ my_free(alloc, MYF(0));
+ }
+ return ptr + array->size_of_element * idx;
+}
+
+/*
+ Returns a pointer to the element number 'idx'
+ or NULL if an element does not exists
+*/
+void *_lf_dynarray_value(LF_DYNARRAY *array, uint idx)
+{
+ void * ptr, * volatile * ptr_ptr= 0;
+ int i;
+
+ for (i= LF_DYNARRAY_LEVELS-1; idx < dynarray_idxes_in_prev_levels[i]; i--)
+ /* no-op */;
+ ptr_ptr= &array->level[i];
+ idx-= dynarray_idxes_in_prev_levels[i];
+ for (; i > 0; i--)
+ {
+ if (!(ptr= *ptr_ptr))
+ return(NULL);
+ ptr_ptr= ((void **)ptr) + idx / dynarray_idxes_in_prev_level[i];
+ idx %= dynarray_idxes_in_prev_level[i];
+ }
+ if (!(ptr= *ptr_ptr))
+ return(NULL);
+ return ptr + array->size_of_element * idx;
+}
+
+static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level,
+ lf_dynarray_func func, void *arg)
+{
+ int res, i;
+ if (!ptr)
+ return 0;
+ if (!level)
+ return func(ptr, arg);
+ for (i= 0; i < LF_DYNARRAY_LEVEL_LENGTH; i++)
+ if ((res= recursive_iterate(array, ((void **)ptr)[i], level-1, func, arg)))
+ return res;
+ return 0;
+}
+
+/*
+ Calls func(array, arg) on every array of LF_DYNARRAY_LEVEL_LENGTH elements
+ in lf_dynarray.
+
+ DESCRIPTION
+ lf_dynarray consists of a set of arrays, LF_DYNARRAY_LEVEL_LENGTH elements
+ each. _lf_dynarray_iterate() calls user-supplied function on every array
+ from the set. It is the fastest way to scan the array, faster than
+ for (i=0; i < N; i++) { func(_lf_dynarray_value(dynarray, i)); }
+*/
+int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg)
+{
+ int i, res;
+ for (i= 0; i < LF_DYNARRAY_LEVELS; i++)
+ if ((res= recursive_iterate(array, array->level[i], i, func, arg)))
+ return res;
+ return 0;
+}
+
diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c
new file mode 100644
index 00000000000..fb2fb88492f
--- /dev/null
+++ b/mysys/lf_hash.c
@@ -0,0 +1,400 @@
+/* Copyright (C) 2000 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ extensible hash
+
+ TODO
+ try to get rid of dummy nodes ?
+ for non-unique hash, count only _distinct_ values
+ (but how to do it in lf_hash_delete ?)
+*/
+#include <my_global.h>
+#include <m_string.h>
+#include <my_sys.h>
+#include <my_bit.h>
+#include <lf.h>
+
+LF_REQUIRE_PINS(3);
+
+/* An element of the list */
+typedef struct {
+ intptr volatile link; /* a pointer to the next element in a listand a flag */
+ uint32 hashnr; /* reversed hash number, for sorting */
+ const byte *key;
+ uint keylen;
+} LF_SLIST;
+
+/*
+ a structure to pass the context (pointers two the three successive elements
+ in a list) from lfind to linsert/ldelete
+*/
+typedef struct {
+ intptr volatile *prev;
+ LF_SLIST *curr, *next;
+} CURSOR;
+
+/*
+ the last bit in LF_SLIST::link is a "deleted" flag.
+ the helper macros below convert it to a pure pointer or a pure flag
+*/
+#define PTR(V) (LF_SLIST *)((V) & (~(intptr)1))
+#define DELETED(V) ((V) & 1)
+
+/*
+ DESCRIPTION
+ Search for hashnr/key/keylen in the list starting from 'head' and
+ position the cursor. The list is ORDER BY hashnr, key
+
+ RETURN
+ 0 - not found
+ 1 - found
+
+ NOTE
+ cursor is positioned in either case
+ pins[0..2] are used, they are NOT removed on return
+*/
+static int lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
+ const byte *key, uint keylen, CURSOR *cursor, LF_PINS *pins)
+{
+ uint32 cur_hashnr;
+ const byte *cur_key;
+ uint cur_keylen;
+ intptr link;
+
+retry:
+ cursor->prev= (intptr *)head;
+ do {
+ cursor->curr= PTR(*cursor->prev);
+ _lf_pin(pins, 1, cursor->curr);
+ } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF);
+ for (;;)
+ {
+ if (!cursor->curr)
+ return 0;
+ do {
+ /* QQ: XXX or goto retry ? */
+ link= cursor->curr->link;
+ cursor->next= PTR(link);
+ _lf_pin(pins, 0, cursor->next);
+ } while(link != cursor->curr->link && LF_BACKOFF);
+ cur_hashnr= cursor->curr->hashnr;
+ cur_key= cursor->curr->key;
+ cur_keylen= cursor->curr->keylen;
+ if (*cursor->prev != (intptr)cursor->curr)
+ {
+ (void)LF_BACKOFF;
+ goto retry;
+ }
+ if (!DELETED(link))
+ {
+ if (cur_hashnr >= hashnr)
+ {
+ int r= 1;
+ if (cur_hashnr > hashnr ||
+ (r= my_strnncoll(cs, (uchar*) cur_key, cur_keylen, (uchar*) key,
+ keylen)) >= 0)
+ return !r;
+ }
+ cursor->prev= &(cursor->curr->link);
+ _lf_pin(pins, 2, cursor->curr);
+ }
+ else
+ {
+ if (my_atomic_casptr((void **)cursor->prev,
+ (void **)&cursor->curr, cursor->next))
+ _lf_alloc_free(pins, cursor->curr);
+ else
+ {
+ (void)LF_BACKOFF;
+ goto retry;
+ }
+ }
+ cursor->curr= cursor->next;
+ _lf_pin(pins, 1, cursor->curr);
+ }
+}
+
+/*
+ DESCRIPTION
+ insert a 'node' in the list that starts from 'head' in the correct
+ position (as found by lfind)
+
+ RETURN
+ 0 - inserted
+ not 0 - a pointer to a duplicate (not pinned and thus unusable)
+
+ NOTE
+ it uses pins[0..2], on return all pins are removed.
+*/
+static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs,
+ LF_SLIST *node, LF_PINS *pins, uint flags)
+{
+ CURSOR cursor;
+ int res= -1;
+
+ do
+ {
+ if (lfind(head, cs, node->hashnr, node->key, node->keylen,
+ &cursor, pins) &&
+ (flags & LF_HASH_UNIQUE))
+ res= 0; /* duplicate found */
+ else
+ {
+ node->link= (intptr)cursor.curr;
+ assert(node->link != (intptr)node);
+ assert(cursor.prev != &node->link);
+ if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
+ res= 1; /* inserted ok */
+ }
+ } while (res == -1);
+ _lf_unpin(pins, 0);
+ _lf_unpin(pins, 1);
+ _lf_unpin(pins, 2);
+ return res ? 0 : cursor.curr;
+}
+
+/*
+ DESCRIPTION
+ deletes a node as identified by hashnr/keey/keylen from the list
+ that starts from 'head'
+
+ RETURN
+ 0 - ok
+ 1 - not found
+
+ NOTE
+ it uses pins[0..2], on return all pins are removed.
+*/
+static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
+ const byte *key, uint keylen, LF_PINS *pins)
+{
+ CURSOR cursor;
+ int res= -1;
+
+ do
+ {
+ if (!lfind(head, cs, hashnr, key, keylen, &cursor, pins))
+ res= 1;
+ else
+ if (my_atomic_casptr((void **)&(cursor.curr->link),
+ (void **)&cursor.next, 1+(char *)cursor.next))
+ {
+ if (my_atomic_casptr((void **)cursor.prev,
+ (void **)&cursor.curr, cursor.next))
+ _lf_alloc_free(pins, cursor.curr);
+ else
+ lfind(head, cs, hashnr, key, keylen, &cursor, pins);
+ res= 0;
+ }
+ } while (res == -1);
+ _lf_unpin(pins, 0);
+ _lf_unpin(pins, 1);
+ _lf_unpin(pins, 2);
+ return res;
+}
+
+/*
+ DESCRIPTION
+ searches for a node as identified by hashnr/keey/keylen in the list
+ that starts from 'head'
+
+ RETURN
+ 0 - not found
+ node - found
+
+ NOTE
+ it uses pins[0..2], on return the pin[2] keeps the node found
+ all other pins are removed.
+*/
+static LF_SLIST *lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs,
+ uint32 hashnr, const byte *key, uint keylen,
+ LF_PINS *pins)
+{
+ CURSOR cursor;
+ int res= lfind(head, cs, hashnr, key, keylen, &cursor, pins);
+ if (res) _lf_pin(pins, 2, cursor.curr);
+ _lf_unpin(pins, 0);
+ _lf_unpin(pins, 1);
+ return res ? cursor.curr : 0;
+}
+
+static inline const byte* hash_key(const LF_HASH *hash,
+ const byte *record, uint *length)
+{
+ if (hash->get_key)
+ return (*hash->get_key)(record, length, 0);
+ *length= hash->key_length;
+ return record + hash->key_offset;
+}
+
+static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen)
+{
+ ulong nr1= 1, nr2= 4;
+ hash->charset->coll->hash_sort(hash->charset, (uchar*) key, keylen,
+ &nr1, &nr2);
+ return nr1 & INT_MAX32;
+}
+
+#define MAX_LOAD 1.0
+static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
+
+/*
+ Initializes lf_hash, the arguments are compatible with hash_init
+*/
+void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
+ uint key_offset, uint key_length, hash_get_key get_key,
+ CHARSET_INFO *charset)
+{
+ lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
+ offsetof(LF_SLIST, key));
+ lf_dynarray_init(&hash->array, sizeof(LF_SLIST **));
+ hash->size= 1;
+ hash->count= 0;
+ hash->element_size= element_size;
+ hash->flags= flags;
+ hash->charset= charset ? charset : &my_charset_bin;
+ hash->key_offset= key_offset;
+ hash->key_length= key_length;
+ hash->get_key= get_key;
+ DBUG_ASSERT(get_key ? !key_offset && !key_length : key_length);
+}
+
+void lf_hash_destroy(LF_HASH *hash)
+{
+ LF_SLIST *el= *(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0);
+ while (el)
+ {
+ intptr next= el->link;
+ if (el->hashnr & 1)
+ lf_alloc_real_free(&hash->alloc, el);
+ else
+ my_free((void *)el, MYF(0));
+ el= (LF_SLIST *)next;
+ }
+ lf_alloc_destroy(&hash->alloc);
+ lf_dynarray_destroy(&hash->array);
+}
+
+/*
+ DESCRIPTION
+ inserts a new element to a hash. it will have a _copy_ of
+ data, not a pointer to it.
+
+ RETURN
+ 0 - inserted
+ 1 - didn't (unique key conflict)
+
+ NOTE
+ see linsert() for pin usage notes
+*/
+int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
+{
+ int csize, bucket, hashnr;
+ LF_SLIST *node, * volatile *el;
+
+ lf_rwlock_by_pins(pins);
+ node= (LF_SLIST *)_lf_alloc_new(pins);
+ memcpy(node+1, data, hash->element_size);
+ node->key= hash_key(hash, (byte *)(node+1), &node->keylen);
+ hashnr= calc_hash(hash, node->key, node->keylen);
+ bucket= hashnr % hash->size;
+ el= _lf_dynarray_lvalue(&hash->array, bucket);
+ if (*el == NULL)
+ initialize_bucket(hash, el, bucket, pins);
+ node->hashnr= my_reverse_bits(hashnr) | 1;
+ if (linsert(el, hash->charset, node, pins, hash->flags))
+ {
+ _lf_alloc_free(pins, node);
+ lf_rwunlock_by_pins(pins);
+ return 1;
+ }
+ csize= hash->size;
+ if ((my_atomic_add32(&hash->count, 1)+1.0) / csize > MAX_LOAD)
+ my_atomic_cas32(&hash->size, &csize, csize*2);
+ lf_rwunlock_by_pins(pins);
+ return 0;
+}
+
+/*
+ RETURN
+ 0 - deleted
+ 1 - didn't (not found)
+ NOTE
+ see ldelete() for pin usage notes
+*/
+int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
+{
+ LF_SLIST * volatile *el;
+ uint bucket, hashnr= calc_hash(hash, (byte *)key, keylen);
+
+ bucket= hashnr % hash->size;
+ lf_rwlock_by_pins(pins);
+ el= _lf_dynarray_lvalue(&hash->array, bucket);
+ if (*el == NULL)
+ initialize_bucket(hash, el, bucket, pins);
+ if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
+ (byte *)key, keylen, pins))
+ {
+ lf_rwunlock_by_pins(pins);
+ return 1;
+ }
+ my_atomic_add32(&hash->count, -1);
+ lf_rwunlock_by_pins(pins);
+ return 0;
+}
+
+/*
+ NOTE
+ see lsearch() for pin usage notes
+*/
+void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
+{
+ LF_SLIST * volatile *el, *found;
+ uint bucket, hashnr= calc_hash(hash, (byte *)key, keylen);
+
+ bucket= hashnr % hash->size;
+ lf_rwlock_by_pins(pins);
+ el= _lf_dynarray_lvalue(&hash->array, bucket);
+ if (*el == NULL)
+ initialize_bucket(hash, el, bucket, pins);
+ found= lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
+ (byte *)key, keylen, pins);
+ lf_rwunlock_by_pins(pins);
+ return found ? found+1 : 0;
+}
+
+static const char *dummy_key= "";
+
+static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
+ uint bucket, LF_PINS *pins)
+{
+ uint parent= my_clear_highest_bit(bucket);
+ LF_SLIST *dummy= (LF_SLIST *)my_malloc(sizeof(LF_SLIST), MYF(MY_WME));
+ LF_SLIST **tmp= 0, *cur;
+ LF_SLIST * volatile *el= _lf_dynarray_lvalue(&hash->array, parent);
+ if (*el == NULL && bucket)
+ initialize_bucket(hash, el, parent, pins);
+ dummy->hashnr= my_reverse_bits(bucket);
+ dummy->key= (char*) dummy_key;
+ dummy->keylen= 0;
+ if ((cur= linsert(el, hash->charset, dummy, pins, 0)))
+ {
+ my_free((void *)dummy, MYF(0));
+ dummy= cur;
+ }
+ my_atomic_casptr((void **)node, (void **)&tmp, dummy);
+}
diff --git a/mysys/mf_keycache.c b/mysys/mf_keycache.c
index 86394fec239..95a9f08a07a 100644
--- a/mysys/mf_keycache.c
+++ b/mysys/mf_keycache.c
@@ -42,6 +42,7 @@
#include <keycache.h>
#include "my_static.h"
#include <m_string.h>
+#include <my_bit.h>
#include <errno.h>
#include <stdarg.h>
@@ -1009,12 +1010,12 @@ static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block)
KEYCACHE_THREAD_TRACE("unlink_block");
#if defined(KEYCACHE_DEBUG)
+ KEYCACHE_DBUG_ASSERT(keycache->blocks_available != 0);
keycache->blocks_available--;
KEYCACHE_DBUG_PRINT("unlink_block",
("unlinked block %u status=%x #requests=%u #available=%u",
BLOCK_NUMBER(block), block->status,
block->requests, keycache->blocks_available));
- KEYCACHE_DBUG_ASSERT(keycache->blocks_available >= 0);
#endif
}
@@ -1643,9 +1644,9 @@ restart:
KEYCACHE_DBUG_ASSERT(page_status != -1);
*page_st=page_status;
KEYCACHE_DBUG_PRINT("find_key_block",
- ("fd: %d pos: %lu block->status: %u page_status: %u",
+ ("fd: %d pos: %lu block->status: %u page_status: %d",
file, (ulong) filepos, block->status,
- (uint) page_status));
+ page_status));
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
DBUG_EXECUTE("check_keycache2",
@@ -1793,8 +1794,6 @@ byte *key_cache_read(KEY_CACHE *keycache,
uint offset= 0;
byte *start= buff;
DBUG_ENTER("key_cache_read");
- DBUG_PRINT("enter", ("fd: %u pos: %lu length: %u",
- (uint) file, (ulong) filepos, length));
if (keycache->can_be_used)
{
@@ -1804,6 +1803,11 @@ byte *key_cache_read(KEY_CACHE *keycache,
uint status;
int page_st;
+ DBUG_PRINT("enter", ("fd: %u pos: %lu page: %lu length: %u",
+ (uint) file, (ulong) filepos,
+ (ulong) (filepos / keycache->key_cache_block_size),
+ length));
+
offset= (uint) (filepos & (keycache->key_cache_block_size-1));
/* Read data in key_cache_block_size increments */
do
@@ -2055,10 +2059,6 @@ int key_cache_write(KEY_CACHE *keycache,
reg1 BLOCK_LINK *block;
int error=0;
DBUG_ENTER("key_cache_write");
- DBUG_PRINT("enter",
- ("fd: %u pos: %lu length: %u block_length: %u key_block_length: %u",
- (uint) file, (ulong) filepos, length, block_length,
- keycache ? keycache->key_cache_block_size : 0));
if (!dont_write)
{
@@ -2080,6 +2080,12 @@ int key_cache_write(KEY_CACHE *keycache,
int page_st;
uint offset;
+ DBUG_PRINT("enter",
+ ("fd: %u pos: %lu page: %lu length: %u block_length: %u",
+ (uint) file, (ulong) filepos,
+ (ulong) (filepos / keycache->key_cache_block_size),
+ length, block_length));
+
offset= (uint) (filepos & (keycache->key_cache_block_size-1));
do
{
diff --git a/mysys/mf_keycaches.c b/mysys/mf_keycaches.c
index 51ad54159e5..3f40f4f4010 100644
--- a/mysys/mf_keycaches.c
+++ b/mysys/mf_keycaches.c
@@ -147,7 +147,8 @@ static void safe_hash_free(SAFE_HASH *hash)
Return the value stored for a key or default value if no key
*/
-static byte *safe_hash_search(SAFE_HASH *hash, const byte *key, uint length)
+static byte *safe_hash_search(SAFE_HASH *hash, const byte *key, uint length,
+ byte *def)
{
byte *result;
DBUG_ENTER("safe_hash_search");
@@ -155,7 +156,7 @@ static byte *safe_hash_search(SAFE_HASH *hash, const byte *key, uint length)
result= hash_search(&hash->hash, key, length);
rw_unlock(&hash->mutex);
if (!result)
- result= hash->default_value;
+ result= def;
else
result= ((SAFE_HASH_ENTRY*) result)->data;
DBUG_PRINT("exit",("data: 0x%lx", (long) result));
@@ -315,6 +316,7 @@ void multi_keycache_free(void)
multi_key_cache_search()
key key to find (usually table path)
uint length Length of key.
+ def Default value if no key cache
NOTES
This function is coded in such a way that we will return the
@@ -325,11 +327,13 @@ void multi_keycache_free(void)
key cache to use
*/
-KEY_CACHE *multi_key_cache_search(byte *key, uint length)
+KEY_CACHE *multi_key_cache_search(byte *key, uint length,
+ KEY_CACHE *def)
{
if (!key_cache_hash.hash.records)
- return dflt_key_cache;
- return (KEY_CACHE*) safe_hash_search(&key_cache_hash, key, length);
+ return def;
+ return (KEY_CACHE*) safe_hash_search(&key_cache_hash, key, length,
+ (void*) def);
}
diff --git a/mysys/mf_pagecache.c b/mysys/mf_pagecache.c
new file mode 100755
index 00000000000..1b9d48c80e6
--- /dev/null
+++ b/mysys/mf_pagecache.c
@@ -0,0 +1,4102 @@
+/* Copyright (C) 2000-2006 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ These functions handle page cacheing for Maria tables.
+
+ One cache can handle many files.
+ It must contain buffers of the same blocksize.
+ init_pagecache() should be used to init cache handler.
+
+ The free list (free_block_list) is a stack like structure.
+ When a block is freed by free_block(), it is pushed onto the stack.
+ When a new block is required it is first tried to pop one from the stack.
+ If the stack is empty, it is tried to get a never-used block from the pool.
+ If this is empty too, then a block is taken from the LRU ring, flushing it
+ to disk, if necessary. This is handled in find_block().
+ With the new free list, the blocks can have three temperatures:
+ hot, warm and cold (which is free). This is remembered in the block header
+ by the enum BLOCK_TEMPERATURE temperature variable. Remembering the
+ temperature is necessary to correctly count the number of warm blocks,
+ which is required to decide when blocks are allowed to become hot. Whenever
+ a block is inserted to another (sub-)chain, we take the old and new
+ temperature into account to decide if we got one more or less warm block.
+ blocks_unused is the sum of never used blocks in the pool and of currently
+ free blocks. blocks_used is the number of blocks fetched from the pool and
+ as such gives the maximum number of in-use blocks at any time.
+*/
+
+#include "mysys_priv.h"
+#include <m_string.h>
+#include <pagecache.h>
+#include "my_static.h"
+#include <my_bit.h>
+#include <errno.h>
+#include <stdarg.h>
+
+/*
+ Some compilation flags have been added specifically for this module
+ to control the following:
+ - not to let a thread to yield the control when reading directly
+ from page cache, which might improve performance in many cases;
+ to enable this add:
+ #define SERIALIZED_READ_FROM_CACHE
+ - to set an upper bound for number of threads simultaneously
+ using the page cache; this setting helps to determine an optimal
+ size for hash table and improve performance when the number of
+ blocks in the page cache much less than the number of threads
+ accessing it;
+ to set this number equal to <N> add
+ #define MAX_THREADS <N>
+ - to substitute calls of pthread_cond_wait for calls of
+ pthread_cond_timedwait (wait with timeout set up);
+ this setting should be used only when you want to trap a deadlock
+ situation, which theoretically should not happen;
+ to set timeout equal to <T> seconds add
+ #define PAGECACHE_TIMEOUT <T>
+ - to enable the module traps and to send debug information from
+ page cache module to a special debug log add:
+ #define PAGECACHE_DEBUG
+ the name of this debug log file <LOG NAME> can be set through:
+ #define PAGECACHE_DEBUG_LOG <LOG NAME>
+ if the name is not defined, it's set by default;
+ if the PAGECACHE_DEBUG flag is not set up and we are in a debug
+ mode, i.e. when ! defined(DBUG_OFF), the debug information from the
+ module is sent to the regular debug log.
+
+ Example of the settings:
+ #define SERIALIZED_READ_FROM_CACHE
+ #define MAX_THREADS 100
+ #define PAGECACHE_TIMEOUT 1
+ #define PAGECACHE_DEBUG
+ #define PAGECACHE_DEBUG_LOG "my_pagecache_debug.log"
+*/
+
+/*
+ In key cache we have external raw locking here we use
+ SERIALIZED_READ_FROM_CACHE to avoid problem of reading
+ not consistent data from the page.
+ (keycache functions (key_cache_read(), key_cache_insert() and
+ key_cache_write()) rely on external MyISAM lock, we don't)
+*/
+#define SERIALIZED_READ_FROM_CACHE yes
+
+#define BLOCK_INFO(B) \
+ DBUG_PRINT("info", \
+ ("block 0x%lx file %lu page %lu s %0x hshL 0x%lx req %u/%u " \
+ "wrlock: %c", \
+ (ulong)(B), \
+ (ulong)((B)->hash_link ? \
+ (B)->hash_link->file.file : \
+ 0), \
+ (ulong)((B)->hash_link ? \
+ (B)->hash_link->pageno : \
+ 0), \
+ (B)->status, \
+ (ulong)(B)->hash_link, \
+ (uint) (B)->requests, \
+ (uint)((B)->hash_link ? \
+ (B)->hash_link->requests : \
+ 0), \
+ ((block->status & BLOCK_WRLOCK)?'Y':'N')))
+
+/* TODO: put it to my_static.c */
+my_bool my_disable_flush_pagecache_blocks= 0;
+
+#define STRUCT_PTR(TYPE, MEMBER, a) \
+ (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))
+
+/* types of condition variables */
+#define COND_FOR_REQUESTED 0 /* queue of thread waiting for read operation */
+#define COND_FOR_SAVED 1 /* queue of thread waiting for flush */
+#define COND_FOR_WRLOCK 2 /* queue of write lock */
+#define COND_SIZE 3 /* number of COND_* queues */
+
+typedef pthread_cond_t KEYCACHE_CONDVAR;
+
+/* descriptor of the page in the page cache block buffer */
+struct st_pagecache_page
+{
+ PAGECACHE_FILE file; /* file to which the page belongs to */
+ pgcache_page_no_t pageno; /* number of the page in the file */
+};
+
+/* element in the chain of a hash table bucket */
+struct st_pagecache_hash_link
+{
+ struct st_pagecache_hash_link
+ *next, **prev; /* to connect links in the same bucket */
+ struct st_pagecache_block_link
+ *block; /* reference to the block for the page: */
+ PAGECACHE_FILE file; /* from such a file */
+ pgcache_page_no_t pageno; /* this page */
+ uint requests; /* number of requests for the page */
+};
+
+/* simple states of a block */
+#define BLOCK_ERROR 1 /* an error occurred when performing disk i/o */
+#define BLOCK_READ 2 /* the is page in the block buffer */
+#define BLOCK_IN_SWITCH 4 /* block is preparing to read new page */
+#define BLOCK_REASSIGNED 8 /* block does not accept requests for old page */
+#define BLOCK_IN_FLUSH 16 /* block is in flush operation */
+#define BLOCK_CHANGED 32 /* block buffer contains a dirty page */
+#define BLOCK_WRLOCK 64 /* write locked block */
+
+/* page status, returned by find_block */
+#define PAGE_READ 0
+#define PAGE_TO_BE_READ 1
+#define PAGE_WAIT_TO_BE_READ 2
+
+/* block temperature determines in which (sub-)chain the block currently is */
+enum BLOCK_TEMPERATURE { BLOCK_COLD /*free*/ , BLOCK_WARM , BLOCK_HOT };
+
+/* debug info */
+#ifndef DBUG_OFF
+static char *page_cache_page_type_str[]=
+{
+ (char*)"PLAIN",
+ (char*)"LSN"
+};
+static char *page_cache_page_write_mode_str[]=
+{
+ (char*)"DELAY",
+ (char*)"NOW",
+ (char*)"DONE"
+};
+static char *page_cache_page_lock_str[]=
+{
+ (char*)"free -> free ",
+ (char*)"read -> read ",
+ (char*)"write -> write",
+ (char*)"free -> read ",
+ (char*)"free -> write",
+ (char*)"read -> free ",
+ (char*)"write -> free ",
+ (char*)"write -> read "
+};
+static char *page_cache_page_pin_str[]=
+{
+ (char*)"pinned -> pinned ",
+ (char*)"unpinned -> unpinned",
+ (char*)"unpinned -> pinned ",
+ (char*)"pinned -> unpinned"
+};
+#endif
+#ifdef PAGECACHE_DEBUG
+typedef struct st_pagecache_pin_info
+{
+ struct st_pagecache_pin_info *next, **prev;
+ struct st_my_thread_var *thread;
+} PAGECACHE_PIN_INFO;
+/*
+ st_pagecache_lock_info structure should be kept in next, prev, thread part
+ compatible with st_pagecache_pin_info to be compatible in functions.
+*/
+typedef struct st_pagecache_lock_info
+{
+ struct st_pagecache_lock_info *next, **prev;
+ struct st_my_thread_var *thread;
+ my_bool write_lock;
+} PAGECACHE_LOCK_INFO;
+
+
+/* service functions maintain debugging info about pin & lock */
+
+
+/*
+ Links information about thread pinned/locked the block to the list
+
+ SYNOPSIS
+ info_link()
+ list the list to link in
+ node the node which should be linked
+*/
+
+static void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node)
+{
+ if ((node->next= *list))
+ node->next->prev= &(node->next);
+ *list= node;
+ node->prev= list;
+}
+
+
+/*
+ Unlinks information about thread pinned/locked the block from the list
+
+ SYNOPSIS
+ info_unlink()
+ node the node which should be unlinked
+*/
+
+static void info_unlink(PAGECACHE_PIN_INFO *node)
+{
+ if ((*node->prev= node->next))
+ node->next->prev= node->prev;
+}
+
+
+/*
+ Finds information about given thread in the list of threads which
+ pinned/locked this block.
+
+ SYNOPSIS
+ info_find()
+ list the list where to find the thread
+ thread thread ID (reference to the st_my_thread_var
+ of the thread)
+
+ RETURN
+ 0 - the thread was not found
+ pointer to the information node of the thread in the list
+*/
+
+static PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list,
+ struct st_my_thread_var *thread)
+{
+ register PAGECACHE_PIN_INFO *i= list;
+ for(; i != 0; i= i->next)
+ if (i->thread == thread)
+ return i;
+ return 0;
+}
+#endif
+
+/* page cache block */
+struct st_pagecache_block_link
+{
+ struct st_pagecache_block_link
+ *next_used, **prev_used; /* to connect links in the LRU chain (ring) */
+ struct st_pagecache_block_link
+ *next_changed, **prev_changed; /* for lists of file dirty/clean blocks */
+ struct st_pagecache_hash_link
+ *hash_link; /* backward ptr to referring hash_link */
+ WQUEUE
+ wqueue[COND_SIZE]; /* queues on waiting requests for new/old pages */
+ uint requests; /* number of requests for the block */
+ byte *buffer; /* buffer for the block page */
+ uint status; /* state of the block */
+ uint pins; /* pin counter */
+#ifdef PAGECACHE_DEBUG
+ PAGECACHE_PIN_INFO *pin_list;
+ PAGECACHE_LOCK_INFO *lock_list;
+#endif
+ enum BLOCK_TEMPERATURE temperature; /* block temperature: cold, warm, hot */
+ enum pagecache_page_type type; /* type of the block */
+ uint hits_left; /* number of hits left until promotion */
+ ulonglong last_hit_time; /* timestamp of the last hit */
+ LSN rec_lsn; /* LSN when first became dirty */
+ KEYCACHE_CONDVAR *condvar; /* condition variable for 'no readers' event */
+};
+
+#ifdef PAGECACHE_DEBUG
+/* debug checks */
+static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block,
+ enum pagecache_page_pin mode)
+{
+ struct st_my_thread_var *thread= my_thread_var;
+ PAGECACHE_PIN_INFO *info= info_find(block->pin_list, thread);
+ DBUG_ENTER("info_check_pin");
+ if (info)
+ {
+ if (mode == PAGECACHE_PIN_LEFT_UNPINNED)
+ {
+ DBUG_PRINT("info",
+ ("info_check_pin: thread: 0x%lx block 0x%lx: LEFT_UNPINNED!!!",
+ (ulong)thread, (ulong)block));
+ DBUG_RETURN(1);
+ }
+ else if (mode == PAGECACHE_PIN)
+ {
+ DBUG_PRINT("info",
+ ("info_check_pin: thread: 0x%lx block 0x%lx: PIN!!!",
+ (ulong)thread, (ulong)block));
+ DBUG_RETURN(1);
+ }
+ }
+ else
+ {
+ if (mode == PAGECACHE_PIN_LEFT_PINNED)
+ {
+ DBUG_PRINT("info",
+ ("info_check_pin: thread: 0x%lx block 0x%lx: LEFT_PINNED!!!",
+ (ulong)thread, (ulong)block));
+ DBUG_RETURN(1);
+ }
+ else if (mode == PAGECACHE_UNPIN)
+ {
+ DBUG_PRINT("info",
+ ("info_check_pin: thread: 0x%lx block 0x%lx: UNPIN!!!",
+ (ulong)thread, (ulong)block));
+ DBUG_RETURN(1);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Debug function which checks current lock/pin state and requested changes
+
+ SYNOPSIS
+ info_check_lock()
+ lock requested lock changes
+ pin requested pin changes
+
+ RETURN
+ 0 - OK
+ 1 - Error
+*/
+
+static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin)
+{
+ struct st_my_thread_var *thread= my_thread_var;
+ PAGECACHE_LOCK_INFO *info=
+ (PAGECACHE_LOCK_INFO *) info_find((PAGECACHE_PIN_INFO *) block->lock_list,
+ thread);
+ DBUG_ENTER("info_check_lock");
+ switch(lock)
+ {
+ case PAGECACHE_LOCK_LEFT_UNLOCKED:
+ if (pin != PAGECACHE_PIN_LEFT_UNPINNED ||
+ info)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_LEFT_READLOCKED:
+ if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ pin != PAGECACHE_PIN_LEFT_PINNED) ||
+ info == 0 || info->write_lock)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_LEFT_WRITELOCKED:
+ if (pin != PAGECACHE_PIN_LEFT_PINNED ||
+ info == 0 || !info->write_lock)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_READ:
+ if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ pin != PAGECACHE_PIN) ||
+ info != 0)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_WRITE:
+ if (pin != PAGECACHE_PIN ||
+ info != 0)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_READ_UNLOCK:
+ if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ pin != PAGECACHE_UNPIN) ||
+ info == 0 || info->write_lock)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_WRITE_UNLOCK:
+ if (pin != PAGECACHE_UNPIN ||
+ info == 0 || !info->write_lock)
+ goto error;
+ break;
+ case PAGECACHE_LOCK_WRITE_TO_READ:
+ if ((pin != PAGECACHE_PIN_LEFT_PINNED &&
+ pin != PAGECACHE_UNPIN) ||
+ info == 0 || !info->write_lock)
+ goto error;
+ break;
+ }
+ DBUG_RETURN(0);
+error:
+ DBUG_PRINT("info",
+ ("info_check_lock: thread: 0x%lx block 0x%lx: info: %d wrt: %d,"
+ "to lock: %s, to pin: %s",
+ (ulong)thread, (ulong)block, test(info),
+ (info ? info->write_lock : 0),
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin]));
+ DBUG_RETURN(1);
+}
+#endif
+
+#define FLUSH_CACHE 2000 /* sort this many blocks at once */
+
+static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block);
+static void test_key_cache(PAGECACHE *pagecache,
+ const char *where, my_bool lock);
+
+#define PAGECACHE_HASH(p, f, pos) (((ulong) (pos) + \
+ (ulong) (f).file) & (p->hash_entries-1))
+#define FILE_HASH(f) ((uint) (f).file & (PAGECACHE_CHANGED_BLOCKS_HASH - 1))
+
+#define DEFAULT_PAGECACHE_DEBUG_LOG "pagecache_debug.log"
+
+#if defined(PAGECACHE_DEBUG) && ! defined(PAGECACHE_DEBUG_LOG)
+#define PAGECACHE_DEBUG_LOG DEFAULT_PAGECACHE_DEBUG_LOG
+#endif
+
+#if defined(PAGECACHE_DEBUG_LOG)
+static FILE *pagecache_debug_log= NULL;
+static void pagecache_debug_print _VARARGS((const char *fmt, ...));
+#define PAGECACHE_DEBUG_OPEN \
+ if (!pagecache_debug_log) \
+ { \
+ pagecache_debug_log= fopen(PAGECACHE_DEBUG_LOG, "w"); \
+ (void) setvbuf(pagecache_debug_log, NULL, _IOLBF, BUFSIZ); \
+ }
+
+#define PAGECACHE_DEBUG_CLOSE \
+ if (pagecache_debug_log) \
+ { \
+ fclose(pagecache_debug_log); \
+ pagecache_debug_log= 0; \
+ }
+#else
+#define PAGECACHE_DEBUG_OPEN
+#define PAGECACHE_DEBUG_CLOSE
+#endif /* defined(PAGECACHE_DEBUG_LOG) */
+
+#if defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG)
+#define KEYCACHE_DBUG_PRINT(l, m) \
+ { if (pagecache_debug_log) \
+ fprintf(pagecache_debug_log, "%s: ", l); \
+ pagecache_debug_print m; }
+
+#define KEYCACHE_DBUG_ASSERT(a) \
+ { if (! (a) && pagecache_debug_log) \
+ fclose(pagecache_debug_log); \
+ assert(a); }
+#else
+#define KEYCACHE_DBUG_PRINT(l, m) DBUG_PRINT(l, m)
+#define KEYCACHE_DBUG_ASSERT(a) DBUG_ASSERT(a)
+#endif /* defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG) */
+
+#if defined(PAGECACHE_DEBUG) || !defined(DBUG_OFF)
+#ifdef THREAD
+static long pagecache_thread_id;
+#define KEYCACHE_THREAD_TRACE(l) \
+ KEYCACHE_DBUG_PRINT(l,("|thread %ld",pagecache_thread_id))
+
+#define KEYCACHE_THREAD_TRACE_BEGIN(l) \
+ { struct st_my_thread_var *thread_var= my_thread_var; \
+ pagecache_thread_id= thread_var->id; \
+ KEYCACHE_DBUG_PRINT(l,("[thread %ld",pagecache_thread_id)) }
+
+#define KEYCACHE_THREAD_TRACE_END(l) \
+ KEYCACHE_DBUG_PRINT(l,("]thread %ld",pagecache_thread_id))
+#else /* THREAD */
+#define KEYCACHE_THREAD_TRACE(l) KEYCACHE_DBUG_PRINT(l,(""))
+#define KEYCACHE_THREAD_TRACE_BEGIN(l) KEYCACHE_DBUG_PRINT(l,(""))
+#define KEYCACHE_THREAD_TRACE_END(l) KEYCACHE_DBUG_PRINT(l,(""))
+#endif /* THREAD */
+#else
+#define KEYCACHE_THREAD_TRACE_BEGIN(l)
+#define KEYCACHE_THREAD_TRACE_END(l)
+#define KEYCACHE_THREAD_TRACE(l)
+#endif /* defined(PAGECACHE_DEBUG) || !defined(DBUG_OFF) */
+
+#define BLOCK_NUMBER(p, b) \
+ ((uint) (((char*)(b)-(char *) p->block_root)/sizeof(PAGECACHE_BLOCK_LINK)))
+#define PAGECACHE_HASH_LINK_NUMBER(p, h) \
+ ((uint) (((char*)(h)-(char *) p->hash_link_root)/ \
+ sizeof(PAGECACHE_HASH_LINK)))
+
+#if (defined(PAGECACHE_TIMEOUT) && !defined(__WIN__)) || defined(PAGECACHE_DEBUG)
+static int pagecache_pthread_cond_wait(pthread_cond_t *cond,
+ pthread_mutex_t *mutex);
+#else
+#define pagecache_pthread_cond_wait pthread_cond_wait
+#endif
+
+#if defined(PAGECACHE_DEBUG)
+static int ___pagecache_pthread_mutex_lock(pthread_mutex_t *mutex);
+static void ___pagecache_pthread_mutex_unlock(pthread_mutex_t *mutex);
+static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond);
+#define pagecache_pthread_mutex_lock(M) \
+{ DBUG_PRINT("lock", ("mutex lock 0x%lx %u", (ulong)(M), __LINE__)); \
+ ___pagecache_pthread_mutex_lock(M);}
+#define pagecache_pthread_mutex_unlock(M) \
+{ DBUG_PRINT("lock", ("mutex unlock 0x%lx %u", (ulong)(M), __LINE__)); \
+ ___pagecache_pthread_mutex_unlock(M);}
+#define pagecache_pthread_cond_signal(M) \
+{ DBUG_PRINT("lock", ("signal 0x%lx %u", (ulong)(M), __LINE__)); \
+ ___pagecache_pthread_cond_signal(M);}
+#else
+#define pagecache_pthread_mutex_lock pthread_mutex_lock
+#define pagecache_pthread_mutex_unlock pthread_mutex_unlock
+#define pagecache_pthread_cond_signal pthread_cond_signal
+#endif /* defined(PAGECACHE_DEBUG) */
+
+extern my_bool translog_flush(LSN lsn);
+
+/*
+ Write page to the disk
+
+ SYNOPSIS
+ pagecache_fwrite()
+ pagecache - page cache pointer
+ filedesc - pagecache file descriptor structure
+ buffer - buffer which we will write
+ type - page type (plain or with LSN)
+ flags - MYF() flags
+
+ RETURN
+ 0 - OK
+ !=0 - Error
+*/
+
+static uint pagecache_fwrite(PAGECACHE *pagecache,
+ PAGECACHE_FILE *filedesc,
+ byte *buffer,
+ pgcache_page_no_t pageno,
+ enum pagecache_page_type type,
+ myf flags)
+{
+ DBUG_ENTER("pagecache_fwrite");
+ if (type == PAGECACHE_LSN_PAGE)
+ {
+ LSN lsn;
+ DBUG_PRINT("info", ("Log handler call"));
+ /* TODO: integrate with page format */
+#define PAGE_LSN_OFFSET 0
+ lsn= lsn_korr(buffer + PAGE_LSN_OFFSET);
+ /*
+ check CONTROL_FILE_IMPOSSIBLE_FILENO &
+ CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET
+ */
+ DBUG_ASSERT(lsn != 0);
+ translog_flush(lsn);
+ }
+ DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
+ (pageno)<<(pagecache->shift), flags));
+}
+
+
+/*
+ Read page from the disk
+
+ SYNOPSIS
+ pagecache_fread()
+ pagecache - page cache pointer
+ filedesc - pagecache file descriptor structure
+ buffer - buffer in which we will read
+ pageno - page number
+ flags - MYF() flags
+*/
+#define pagecache_fread(pagecache, filedesc, buffer, pageno, flags) \
+ my_pread((filedesc)->file, buffer, pagecache->block_size, \
+ (pageno)<<(pagecache->shift), flags)
+
+
+/*
+ next_power(value) is 2 at the power of (1+floor(log2(value)));
+ e.g. next_power(2)=4, next_power(3)=4.
+*/
+static inline uint next_power(uint value)
+{
+ return (uint) my_round_up_to_next_power((uint32) value) << 1;
+}
+
+
+/*
+ Initialize a page cache
+
+ SYNOPSIS
+ init_pagecache()
+ pagecache pointer to a page cache data structure
+ key_cache_block_size size of blocks to keep cached data
+ use_mem total memory to use for the key cache
+ division_limit division limit (may be zero)
+ age_threshold age threshold (may be zero)
+ block_size size of block (should be power of 2)
+
+ RETURN VALUE
+ number of blocks in the key cache, if successful,
+ 0 - otherwise.
+
+ NOTES.
+ if pagecache->inited != 0 we assume that the key cache
+ is already initialized. This is for now used by myisamchk, but shouldn't
+ be something that a program should rely on!
+
+ It's assumed that no two threads call this function simultaneously
+ referring to the same key cache handle.
+
+*/
+
+int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem,
+ uint division_limit, uint age_threshold,
+ uint block_size)
+{
+ uint blocks, hash_links, length;
+ int error;
+ DBUG_ENTER("init_pagecache");
+ DBUG_ASSERT(block_size >= 512);
+
+ PAGECACHE_DEBUG_OPEN;
+ if (pagecache->inited && pagecache->disk_blocks > 0)
+ {
+ DBUG_PRINT("warning",("key cache already in use"));
+ DBUG_RETURN(0);
+ }
+
+ pagecache->global_cache_w_requests= pagecache->global_cache_r_requests= 0;
+ pagecache->global_cache_read= pagecache->global_cache_write= 0;
+ pagecache->disk_blocks= -1;
+ if (! pagecache->inited)
+ {
+ pagecache->inited= 1;
+ pagecache->in_init= 0;
+ pthread_mutex_init(&pagecache->cache_lock, MY_MUTEX_INIT_FAST);
+ pagecache->resize_queue.last_thread= NULL;
+ }
+
+ pagecache->mem_size= use_mem;
+ pagecache->block_size= block_size;
+ pagecache->shift= my_bit_log2(block_size);
+ DBUG_PRINT("info", ("block_size: %u",
+ block_size));
+ DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size);
+
+ blocks= (int) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) +
+ 2 * sizeof(PAGECACHE_HASH_LINK) +
+ sizeof(PAGECACHE_HASH_LINK*) *
+ 5/4 + block_size));
+ /* It doesn't make sense to have too few blocks (less than 8) */
+ if (blocks >= 8 && pagecache->disk_blocks < 0)
+ {
+ for ( ; ; )
+ {
+ /* Set my_hash_entries to the next bigger 2 power */
+ if ((pagecache->hash_entries= next_power(blocks)) <
+ (blocks) * 5/4)
+ pagecache->hash_entries<<= 1;
+ hash_links= 2 * blocks;
+#if defined(MAX_THREADS)
+ if (hash_links < MAX_THREADS + blocks - 1)
+ hash_links= MAX_THREADS + blocks - 1;
+#endif
+ while ((length= (ALIGN_SIZE(blocks * sizeof(PAGECACHE_BLOCK_LINK)) +
+ ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) +
+ ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) *
+ pagecache->hash_entries))) +
+ (((ulong) blocks) << pagecache->shift) > use_mem)
+ blocks--;
+ /* Allocate memory for cache page buffers */
+ if ((pagecache->block_mem=
+ my_large_malloc((ulong) blocks * pagecache->block_size,
+ MYF(MY_WME))))
+ {
+ /*
+ Allocate memory for blocks, hash_links and hash entries;
+ For each block 2 hash links are allocated
+ */
+ if ((pagecache->block_root=
+ (PAGECACHE_BLOCK_LINK*) my_malloc((uint) length,
+ MYF(0))))
+ break;
+ my_large_free(pagecache->block_mem, MYF(0));
+ pagecache->block_mem= 0;
+ }
+ if (blocks < 8)
+ {
+ my_errno= ENOMEM;
+ goto err;
+ }
+ blocks= blocks / 4*3;
+ }
+ pagecache->blocks_unused= (ulong) blocks;
+ pagecache->disk_blocks= (int) blocks;
+ pagecache->hash_links= hash_links;
+ pagecache->hash_root=
+ (PAGECACHE_HASH_LINK**) ((char*) pagecache->block_root +
+ ALIGN_SIZE(blocks*sizeof(PAGECACHE_BLOCK_LINK)));
+ pagecache->hash_link_root=
+ (PAGECACHE_HASH_LINK*) ((char*) pagecache->hash_root +
+ ALIGN_SIZE((sizeof(PAGECACHE_HASH_LINK*) *
+ pagecache->hash_entries)));
+ bzero((byte*) pagecache->block_root,
+ pagecache->disk_blocks * sizeof(PAGECACHE_BLOCK_LINK));
+ bzero((byte*) pagecache->hash_root,
+ pagecache->hash_entries * sizeof(PAGECACHE_HASH_LINK*));
+ bzero((byte*) pagecache->hash_link_root,
+ pagecache->hash_links * sizeof(PAGECACHE_HASH_LINK));
+ pagecache->hash_links_used= 0;
+ pagecache->free_hash_list= NULL;
+ pagecache->blocks_used= pagecache->blocks_changed= 0;
+
+ pagecache->global_blocks_changed= 0;
+ pagecache->blocks_available=0; /* For debugging */
+
+ /* The LRU chain is empty after initialization */
+ pagecache->used_last= NULL;
+ pagecache->used_ins= NULL;
+ pagecache->free_block_list= NULL;
+ pagecache->time= 0;
+ pagecache->warm_blocks= 0;
+ pagecache->min_warm_blocks= (division_limit ?
+ blocks * division_limit / 100 + 1 :
+ blocks);
+ pagecache->age_threshold= (age_threshold ?
+ blocks * age_threshold / 100 :
+ blocks);
+
+ pagecache->cnt_for_resize_op= 0;
+ pagecache->resize_in_flush= 0;
+ pagecache->can_be_used= 1;
+
+ pagecache->waiting_for_hash_link.last_thread= NULL;
+ pagecache->waiting_for_block.last_thread= NULL;
+ DBUG_PRINT("exit",
+ ("disk_blocks: %d block_root: 0x%lx hash_entries: %d\
+ hash_root: 0x%lx hash_links: %d hash_link_root: 0x%lx",
+ pagecache->disk_blocks, (long) pagecache->block_root,
+ pagecache->hash_entries, (long) pagecache->hash_root,
+ pagecache->hash_links, (long) pagecache->hash_link_root));
+ bzero((gptr) pagecache->changed_blocks,
+ sizeof(pagecache->changed_blocks[0]) *
+ PAGECACHE_CHANGED_BLOCKS_HASH);
+ bzero((gptr) pagecache->file_blocks,
+ sizeof(pagecache->file_blocks[0]) *
+ PAGECACHE_CHANGED_BLOCKS_HASH);
+ }
+
+ pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0;
+ DBUG_RETURN((uint) pagecache->blocks);
+
+err:
+ error= my_errno;
+ pagecache->disk_blocks= 0;
+ pagecache->blocks= 0;
+ if (pagecache->block_mem)
+ {
+ my_large_free((gptr) pagecache->block_mem, MYF(0));
+ pagecache->block_mem= NULL;
+ }
+ if (pagecache->block_root)
+ {
+ my_free((gptr) pagecache->block_root, MYF(0));
+ pagecache->block_root= NULL;
+ }
+ my_errno= error;
+ pagecache->can_be_used= 0;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Flush all blocks in the key cache to disk
+*/
+
+#ifdef NOT_USED
+static int flush_all_key_blocks(PAGECACHE *pagecache)
+{
+#if defined(PAGECACHE_DEBUG)
+ uint cnt=0;
+#endif
+ while (pagecache->blocks_changed > 0)
+ {
+ PAGECACHE_BLOCK_LINK *block;
+ for (block= pagecache->used_last->next_used ; ; block=block->next_used)
+ {
+ if (block->hash_link)
+ {
+#if defined(PAGECACHE_DEBUG)
+ cnt++;
+ KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
+#endif
+ if (flush_pagecache_blocks_int(pagecache, &block->hash_link->file,
+ FLUSH_RELEASE))
+ return 1;
+ break;
+ }
+ if (block == pagecache->used_last)
+ break;
+ }
+ }
+ return 0;
+}
+#endif /* NOT_USED */
+
+/*
+ Resize a key cache
+
+ SYNOPSIS
+ resize_pagecache()
+ pagecache pointer to a page cache data structure
+ use_mem total memory to use for the new key cache
+ division_limit new division limit (if not zero)
+ age_threshold new age threshold (if not zero)
+
+ RETURN VALUE
+ number of blocks in the key cache, if successful,
+ 0 - otherwise.
+
+ NOTES.
+ The function first compares the memory size parameter
+ with the key cache value.
+
+ If they differ the function free the the memory allocated for the
+ old key cache blocks by calling the end_pagecache function and
+ then rebuilds the key cache with new blocks by calling
+ init_key_cache.
+
+ The function starts the operation only when all other threads
+ performing operations with the key cache let her to proceed
+ (when cnt_for_resize=0).
+
+ Before being usable, this function needs:
+ - to receive fixes for BUG#17332 "changing key_buffer_size on a running
+ server can crash under load" similar to those done to the key cache
+ - to have us (Sanja) look at the additional constraints placed on
+ resizing, due to the page locking specific to this page cache.
+ So we disable it for now.
+*/
+#if NOT_USED /* keep disabled until code is fixed see above !! */
+int resize_pagecache(PAGECACHE *pagecache,
+ my_size_t use_mem, uint division_limit,
+ uint age_threshold)
+{
+ int blocks;
+#ifdef THREAD
+ struct st_my_thread_var *thread;
+ WQUEUE *wqueue;
+
+#endif
+ DBUG_ENTER("resize_pagecache");
+
+ if (!pagecache->inited)
+ DBUG_RETURN(pagecache->disk_blocks);
+
+ if(use_mem == pagecache->mem_size)
+ {
+ change_pagecache_param(pagecache, division_limit, age_threshold);
+ DBUG_RETURN(pagecache->disk_blocks);
+ }
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+
+#ifdef THREAD
+ wqueue= &pagecache->resize_queue;
+ thread= my_thread_var;
+ wqueue_link_into_queue(wqueue, thread);
+
+ while (wqueue->last_thread->next != thread)
+ {
+ pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
+ }
+#endif
+
+ pagecache->resize_in_flush= 1;
+ if (flush_all_key_blocks(pagecache))
+ {
+ /* TODO: if this happens, we should write a warning in the log file ! */
+ pagecache->resize_in_flush= 0;
+ blocks= 0;
+ pagecache->can_be_used= 0;
+ goto finish;
+ }
+ pagecache->resize_in_flush= 0;
+ pagecache->can_be_used= 0;
+#ifdef THREAD
+ while (pagecache->cnt_for_resize_op)
+ {
+ KEYCACHE_DBUG_PRINT("resize_pagecache: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
+ }
+#else
+ KEYCACHE_DBUG_ASSERT(pagecache->cnt_for_resize_op == 0);
+#endif
+
+ end_pagecache(pagecache, 0); /* Don't free mutex */
+ /* The following will work even if use_mem is 0 */
+ blocks= init_pagecache(pagecache, pagecache->block_size, use_mem,
+ division_limit, age_threshold);
+
+finish:
+#ifdef THREAD
+ wqueue_unlink_from_queue(wqueue, thread);
+ /* Signal for the next resize request to proceeed if any */
+ if (wqueue->last_thread)
+ {
+ KEYCACHE_DBUG_PRINT("resize_pagecache: signal",
+ ("thread %ld", wqueue->last_thread->next->id));
+ pagecache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
+ }
+#endif
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_RETURN(blocks);
+}
+#endif /* 0 */
+
+
+/*
+ Increment counter blocking resize key cache operation
+*/
+static inline void inc_counter_for_resize_op(PAGECACHE *pagecache)
+{
+ pagecache->cnt_for_resize_op++;
+}
+
+
+/*
+ Decrement counter blocking resize key cache operation;
+ Signal the operation to proceed when counter becomes equal zero
+*/
+static inline void dec_counter_for_resize_op(PAGECACHE *pagecache)
+{
+#ifdef THREAD
+ struct st_my_thread_var *last_thread;
+ if (!--pagecache->cnt_for_resize_op &&
+ (last_thread= pagecache->resize_queue.last_thread))
+ {
+ KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal",
+ ("thread %ld", last_thread->next->id));
+ pagecache_pthread_cond_signal(&last_thread->next->suspend);
+ }
+#else
+ pagecache->cnt_for_resize_op--;
+#endif
+}
+
+/*
+ Change the page cache parameters
+
+ SYNOPSIS
+ change_pagecache_param()
+ pagecache pointer to a page cache data structure
+ division_limit new division limit (if not zero)
+ age_threshold new age threshold (if not zero)
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ Presently the function resets the key cache parameters
+ concerning midpoint insertion strategy - division_limit and
+ age_threshold.
+*/
+
+void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
+ uint age_threshold)
+{
+ DBUG_ENTER("change_pagecache_param");
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ if (division_limit)
+ pagecache->min_warm_blocks= (pagecache->disk_blocks *
+ division_limit / 100 + 1);
+ if (age_threshold)
+ pagecache->age_threshold= (pagecache->disk_blocks *
+ age_threshold / 100);
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Removes page cache from memory. Does NOT flush pages to disk.
+
+ SYNOPSIS
+ end_pagecache()
+ pagecache page cache handle
+ cleanup Complete free (Free also mutex for key cache)
+
+ RETURN VALUE
+ none
+*/
+
+void end_pagecache(PAGECACHE *pagecache, my_bool cleanup)
+{
+ DBUG_ENTER("end_pagecache");
+ DBUG_PRINT("enter", ("key_cache: 0x%lx", (long) pagecache));
+
+ if (!pagecache->inited)
+ DBUG_VOID_RETURN;
+
+ if (pagecache->disk_blocks > 0)
+ {
+ if (pagecache->block_mem)
+ {
+ my_large_free((gptr) pagecache->block_mem, MYF(0));
+ pagecache->block_mem= NULL;
+ my_free((gptr) pagecache->block_root, MYF(0));
+ pagecache->block_root= NULL;
+ }
+ pagecache->disk_blocks= -1;
+ /* Reset blocks_changed to be safe if flush_all_key_blocks is called */
+ pagecache->blocks_changed= 0;
+ }
+
+ DBUG_PRINT("status", ("used: %lu changed: %lu w_requests: %lu "
+ "writes: %lu r_requests: %lu reads: %lu",
+ pagecache->blocks_used, pagecache->global_blocks_changed,
+ (ulong) pagecache->global_cache_w_requests,
+ (ulong) pagecache->global_cache_write,
+ (ulong) pagecache->global_cache_r_requests,
+ (ulong) pagecache->global_cache_read));
+
+ if (cleanup)
+ {
+ pthread_mutex_destroy(&pagecache->cache_lock);
+ pagecache->inited= pagecache->can_be_used= 0;
+ PAGECACHE_DEBUG_CLOSE;
+ }
+ DBUG_VOID_RETURN;
+} /* end_pagecache */
+
+
+/*
+ Unlink a block from the chain of dirty/clean blocks
+*/
+
+static inline void unlink_changed(PAGECACHE_BLOCK_LINK *block)
+{
+ if (block->next_changed)
+ block->next_changed->prev_changed= block->prev_changed;
+ *block->prev_changed= block->next_changed;
+}
+
+
+/*
+ Link a block into the chain of dirty/clean blocks
+*/
+
+static inline void link_changed(PAGECACHE_BLOCK_LINK *block,
+ PAGECACHE_BLOCK_LINK **phead)
+{
+ block->prev_changed= phead;
+ if ((block->next_changed= *phead))
+ (*phead)->prev_changed= &block->next_changed;
+ *phead= block;
+}
+
+
+/*
+ Unlink a block from the chain of dirty/clean blocks, if it's asked for,
+ and link it to the chain of clean blocks for the specified file
+*/
+
+static void link_to_file_list(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block,
+ PAGECACHE_FILE *file, my_bool unlink)
+{
+ if (unlink)
+ unlink_changed(block);
+ link_changed(block, &pagecache->file_blocks[FILE_HASH(*file)]);
+ if (block->status & BLOCK_CHANGED)
+ {
+ block->status&= ~BLOCK_CHANGED;
+ block->rec_lsn= 0;
+ pagecache->blocks_changed--;
+ pagecache->global_blocks_changed--;
+ }
+}
+
+
+/*
+ Unlink a block from the chain of clean blocks for the specified
+ file and link it to the chain of dirty blocks for this file
+*/
+
+static inline void link_to_changed_list(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block)
+{
+ unlink_changed(block);
+ link_changed(block,
+ &pagecache->changed_blocks[FILE_HASH(block->hash_link->file)]);
+ block->status|=BLOCK_CHANGED;
+ pagecache->blocks_changed++;
+ pagecache->global_blocks_changed++;
+}
+
+
+/*
+ Link a block to the LRU chain at the beginning or at the end of
+ one of two parts.
+
+ SYNOPSIS
+ link_block()
+ pagecache pointer to a page cache data structure
+ block pointer to the block to link to the LRU chain
+ hot <-> to link the block into the hot subchain
+ at_end <-> to link the block at the end of the subchain
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ The LRU chain is represented by a curcular list of block structures.
+ The list is double-linked of the type (**prev,*next) type.
+ The LRU chain is divided into two parts - hot and warm.
+ There are two pointers to access the last blocks of these two
+ parts. The beginning of the warm part follows right after the
+ end of the hot part.
+ Only blocks of the warm part can be used for replacement.
+ The first block from the beginning of this subchain is always
+ taken for eviction (pagecache->last_used->next)
+
+ LRU chain: +------+ H O T +------+
+ +----| end |----...<----| beg |----+
+ | +------+last +------+ |
+ v<-link in latest hot (new end) |
+ | link in latest warm (new end)->^
+ | +------+ W A R M +------+ |
+ +----| beg |---->...----| end |----+
+ +------+ +------+ins
+ first for eviction
+*/
+
+static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
+ my_bool hot, my_bool at_end)
+{
+ PAGECACHE_BLOCK_LINK *ins;
+ PAGECACHE_BLOCK_LINK **ptr_ins;
+
+ BLOCK_INFO(block);
+ KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
+#ifdef THREAD
+ if (!hot && pagecache->waiting_for_block.last_thread)
+ {
+ /* Signal that in the LRU warm sub-chain an available block has appeared */
+ struct st_my_thread_var *last_thread=
+ pagecache->waiting_for_block.last_thread;
+ struct st_my_thread_var *first_thread= last_thread->next;
+ struct st_my_thread_var *next_thread= first_thread;
+ PAGECACHE_HASH_LINK *hash_link=
+ (PAGECACHE_HASH_LINK *) first_thread->opt_info;
+ struct st_my_thread_var *thread;
+ do
+ {
+ thread= next_thread;
+ next_thread= thread->next;
+ /*
+ We notify about the event all threads that ask
+ for the same page as the first thread in the queue
+ */
+ if ((PAGECACHE_HASH_LINK *) thread->opt_info == hash_link)
+ {
+ KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id));
+ pagecache_pthread_cond_signal(&thread->suspend);
+ wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread);
+ block->requests++;
+ }
+ }
+ while (thread != last_thread);
+ hash_link->block= block;
+ KEYCACHE_THREAD_TRACE("link_block: after signaling");
+#if defined(PAGECACHE_DEBUG)
+ KEYCACHE_DBUG_PRINT("link_block",
+ ("linked,unlinked block %u status=%x #requests=%u #available=%u",
+ BLOCK_NUMBER(pagecache, block), block->status,
+ block->requests, pagecache->blocks_available));
+#endif
+ return;
+ }
+#else /* THREAD */
+ KEYCACHE_DBUG_ASSERT(! (!hot && pagecache->waiting_for_block.last_thread));
+ /* Condition not transformed using DeMorgan, to keep the text identical */
+#endif /* THREAD */
+ ptr_ins= hot ? &pagecache->used_ins : &pagecache->used_last;
+ ins= *ptr_ins;
+ if (ins)
+ {
+ ins->next_used->prev_used= &block->next_used;
+ block->next_used= ins->next_used;
+ block->prev_used= &ins->next_used;
+ ins->next_used= block;
+ if (at_end)
+ *ptr_ins= block;
+ }
+ else
+ {
+ /* The LRU chain is empty */
+ pagecache->used_last= pagecache->used_ins= block->next_used= block;
+ block->prev_used= &block->next_used;
+ }
+ KEYCACHE_THREAD_TRACE("link_block");
+#if defined(PAGECACHE_DEBUG)
+ pagecache->blocks_available++;
+ KEYCACHE_DBUG_PRINT("link_block",
+ ("linked block %u:%1u status=%x #requests=%u #available=%u",
+ BLOCK_NUMBER(pagecache, block), at_end, block->status,
+ block->requests, pagecache->blocks_available));
+ KEYCACHE_DBUG_ASSERT((ulong) pagecache->blocks_available <=
+ pagecache->blocks_used);
+#endif
+}
+
+
+/*
+ Unlink a block from the LRU chain
+
+ SYNOPSIS
+ unlink_block()
+ pagecache pointer to a page cache data structure
+ block pointer to the block to unlink from the LRU chain
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ See NOTES for link_block
+*/
+
+static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
+{
+ DBUG_ENTER("unlink_block");
+ DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block));
+ if (block->next_used == block)
+ /* The list contains only one member */
+ pagecache->used_last= pagecache->used_ins= NULL;
+ else
+ {
+ block->next_used->prev_used= block->prev_used;
+ *block->prev_used= block->next_used;
+ if (pagecache->used_last == block)
+ pagecache->used_last= STRUCT_PTR(PAGECACHE_BLOCK_LINK,
+ next_used, block->prev_used);
+ if (pagecache->used_ins == block)
+ pagecache->used_ins= STRUCT_PTR(PAGECACHE_BLOCK_LINK,
+ next_used, block->prev_used);
+ }
+ block->next_used= NULL;
+
+ KEYCACHE_THREAD_TRACE("unlink_block");
+#if defined(PAGECACHE_DEBUG)
+ KEYCACHE_DBUG_ASSERT(pagecache->blocks_available != 0);
+ pagecache->blocks_available--;
+ KEYCACHE_DBUG_PRINT("unlink_block",
+ ("unlinked block 0x%lx (%u) status=%x #requests=%u #available=%u",
+ (ulong)block, BLOCK_NUMBER(pagecache, block), block->status,
+ block->requests, pagecache->blocks_available));
+ BLOCK_INFO(block);
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Register requests for a block
+
+ SYNOPSIS
+ reg_requests()
+ pagecache this page cache reference
+ block the block we request reference
+ count how many requests we register (it is 1 everywhere)
+
+ NOTE
+ Registration of request means we are going to use this block so we exclude
+ it from the LRU if it is first request
+*/
+static void reg_requests(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
+ int count)
+{
+ DBUG_ENTER("reg_requests");
+ DBUG_PRINT("enter", ("block 0x%lx (%u) status=%x, reqs: %u",
+ (ulong)block, BLOCK_NUMBER(pagecache, block),
+ block->status, block->requests));
+ BLOCK_INFO(block);
+ if (! block->requests)
+ /* First request for the block unlinks it */
+ unlink_block(pagecache, block);
+ block->requests+= count;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Unregister request for a block
+ linking it to the LRU chain if it's the last request
+
+ SYNOPSIS
+ unreg_request()
+ pagecache pointer to a page cache data structure
+ block pointer to the block to link to the LRU chain
+ at_end <-> to link the block at the end of the LRU chain
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ Every linking to the LRU chain decrements by one a special block
+ counter (if it's positive). If the at_end parameter is TRUE the block is
+ added either at the end of warm sub-chain or at the end of hot sub-chain.
+ It is added to the hot subchain if its counter is zero and number of
+ blocks in warm sub-chain is not less than some low limit (determined by
+ the division_limit parameter). Otherwise the block is added to the warm
+ sub-chain. If the at_end parameter is FALSE the block is always added
+ at beginning of the warm sub-chain.
+ Thus a warm block can be promoted to the hot sub-chain when its counter
+ becomes zero for the first time.
+ At the same time the block at the very beginning of the hot subchain
+ might be moved to the beginning of the warm subchain if it stays untouched
+ for a too long time (this time is determined by parameter age_threshold).
+*/
+
+static void unreg_request(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block, int at_end)
+{
+ DBUG_ENTER("unreg_request");
+ DBUG_PRINT("enter", ("block 0x%lx (%u) status=%x, reqs: %u",
+ (ulong)block, BLOCK_NUMBER(pagecache, block),
+ block->status, block->requests));
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block->requests > 0);
+ if (! --block->requests)
+ {
+ my_bool hot;
+ if (block->hits_left)
+ block->hits_left--;
+ hot= !block->hits_left && at_end &&
+ pagecache->warm_blocks > pagecache->min_warm_blocks;
+ if (hot)
+ {
+ if (block->temperature == BLOCK_WARM)
+ pagecache->warm_blocks--;
+ block->temperature= BLOCK_HOT;
+ KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks: %lu",
+ pagecache->warm_blocks));
+ }
+ link_block(pagecache, block, hot, (my_bool)at_end);
+ block->last_hit_time= pagecache->time;
+ pagecache->time++;
+
+ block= pagecache->used_ins;
+ /* Check if we should link a hot block to the warm block */
+ if (block && pagecache->time - block->last_hit_time >
+ pagecache->age_threshold)
+ {
+ unlink_block(pagecache, block);
+ link_block(pagecache, block, 0, 0);
+ if (block->temperature != BLOCK_WARM)
+ {
+ pagecache->warm_blocks++;
+ block->temperature= BLOCK_WARM;
+ }
+ KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks: %lu",
+ pagecache->warm_blocks));
+ }
+ }
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Remove a reader of the page in block
+*/
+
+static inline void remove_reader(PAGECACHE_BLOCK_LINK *block)
+{
+ DBUG_ENTER("remove_reader");
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block->hash_link->requests > 0);
+#ifdef THREAD
+ if (! --block->hash_link->requests && block->condvar)
+ pagecache_pthread_cond_signal(block->condvar);
+#else
+ --block->hash_link->requests;
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Wait until the last reader of the page in block
+ signals on its termination
+*/
+
+static inline void wait_for_readers(PAGECACHE *pagecache
+ __attribute__((unused)),
+ PAGECACHE_BLOCK_LINK *block)
+{
+#ifdef THREAD
+ struct st_my_thread_var *thread= my_thread_var;
+ while (block->hash_link->requests)
+ {
+ KEYCACHE_DBUG_PRINT("wait_for_readers: wait",
+ ("suspend thread %ld block %u",
+ thread->id, BLOCK_NUMBER(pagecache, block)));
+ block->condvar= &thread->suspend;
+ pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
+ block->condvar= NULL;
+ }
+#else
+ KEYCACHE_DBUG_ASSERT(block->hash_link->requests == 0);
+#endif
+}
+
+
+/*
+ Add a hash link to a bucket in the hash_table
+*/
+
+static inline void link_hash(PAGECACHE_HASH_LINK **start,
+ PAGECACHE_HASH_LINK *hash_link)
+{
+ if (*start)
+ (*start)->prev= &hash_link->next;
+ hash_link->next= *start;
+ hash_link->prev= start;
+ *start= hash_link;
+}
+
+
+/*
+ Remove a hash link from the hash table
+*/
+
+static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
+{
+ KEYCACHE_DBUG_PRINT("unlink_hash", ("fd: %u pos_ %lu #requests=%u",
+ (uint) hash_link->file.file, (ulong) hash_link->pageno,
+ hash_link->requests));
+ KEYCACHE_DBUG_ASSERT(hash_link->requests == 0);
+ if ((*hash_link->prev= hash_link->next))
+ hash_link->next->prev= hash_link->prev;
+ hash_link->block= NULL;
+#ifdef THREAD
+ if (pagecache->waiting_for_hash_link.last_thread)
+ {
+ /* Signal that a free hash link has appeared */
+ struct st_my_thread_var *last_thread=
+ pagecache->waiting_for_hash_link.last_thread;
+ struct st_my_thread_var *first_thread= last_thread->next;
+ struct st_my_thread_var *next_thread= first_thread;
+ PAGECACHE_PAGE *first_page= (PAGECACHE_PAGE *) (first_thread->opt_info);
+ struct st_my_thread_var *thread;
+
+ hash_link->file= first_page->file;
+ hash_link->pageno= first_page->pageno;
+ do
+ {
+ PAGECACHE_PAGE *page;
+ thread= next_thread;
+ page= (PAGECACHE_PAGE *) thread->opt_info;
+ next_thread= thread->next;
+ /*
+ We notify about the event all threads that ask
+ for the same page as the first thread in the queue
+ */
+ if (page->file.file == hash_link->file.file &&
+ page->pageno == hash_link->pageno)
+ {
+ KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id));
+ pagecache_pthread_cond_signal(&thread->suspend);
+ wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread);
+ }
+ }
+ while (thread != last_thread);
+ link_hash(&pagecache->hash_root[PAGECACHE_HASH(pagecache,
+ hash_link->file,
+ hash_link->pageno)],
+ hash_link);
+ return;
+ }
+#else /* THREAD */
+ KEYCACHE_DBUG_ASSERT(! (pagecache->waiting_for_hash_link.last_thread));
+#endif /* THREAD */
+ hash_link->next= pagecache->free_hash_list;
+ pagecache->free_hash_list= hash_link;
+}
+
+
+/*
+ Get the hash link for the page if it is in the cache (do not put the
+ page in the cache if it is absent there)
+
+ SYNOPSIS
+ get_present_hash_link()
+ pagecache Pagecache reference
+ file file ID
+ pageno page number in the file
+ start where to put pointer to found hash bucket (for
+ direct referring it)
+
+ RETURN
+ found hashlink pointer
+*/
+
+static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ PAGECACHE_HASH_LINK ***start)
+{
+ reg1 PAGECACHE_HASH_LINK *hash_link;
+#if defined(PAGECACHE_DEBUG)
+ int cnt;
+#endif
+ DBUG_ENTER("get_present_hash_link");
+
+ KEYCACHE_DBUG_PRINT("get_present_hash_link", ("fd: %u pos: %lu",
+ (uint) file->file, (ulong) pageno));
+
+ /*
+ Find the bucket in the hash table for the pair (file, pageno);
+ start contains the head of the bucket list,
+ hash_link points to the first member of the list
+ */
+ hash_link= *(*start= &pagecache->hash_root[PAGECACHE_HASH(pagecache,
+ *file, pageno)]);
+#if defined(PAGECACHE_DEBUG)
+ cnt= 0;
+#endif
+ /* Look for an element for the pair (file, pageno) in the bucket chain */
+ while (hash_link &&
+ (hash_link->pageno != pageno ||
+ hash_link->file.file != file->file))
+ {
+ hash_link= hash_link->next;
+#if defined(PAGECACHE_DEBUG)
+ cnt++;
+ if (! (cnt <= pagecache->hash_links_used))
+ {
+ int i;
+ for (i=0, hash_link= **start ;
+ i < cnt ; i++, hash_link= hash_link->next)
+ {
+ KEYCACHE_DBUG_PRINT("get_present_hash_link", ("fd: %u pos: %lu",
+ (uint) hash_link->file.file, (ulong) hash_link->pageno));
+ }
+ }
+ KEYCACHE_DBUG_ASSERT(cnt <= pagecache->hash_links_used);
+#endif
+ }
+ if (hash_link)
+ {
+ /* Register the request for the page */
+ hash_link->requests++;
+ }
+
+ DBUG_RETURN(hash_link);
+}
+
+
+/*
+ Get the hash link for a page
+*/
+
+static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno)
+{
+ reg1 PAGECACHE_HASH_LINK *hash_link;
+ PAGECACHE_HASH_LINK **start;
+
+ KEYCACHE_DBUG_PRINT("get_hash_link", ("fd: %u pos: %lu",
+ (uint) file->file, (ulong) pageno));
+
+restart:
+ /* try to find the page in the cache */
+ hash_link= get_present_hash_link(pagecache, file, pageno,
+ &start);
+ if (!hash_link)
+ {
+ /* There is no hash link in the hash table for the pair (file, pageno) */
+ if (pagecache->free_hash_list)
+ {
+ hash_link= pagecache->free_hash_list;
+ pagecache->free_hash_list= hash_link->next;
+ }
+ else if (pagecache->hash_links_used < pagecache->hash_links)
+ {
+ hash_link= &pagecache->hash_link_root[pagecache->hash_links_used++];
+ }
+ else
+ {
+#ifdef THREAD
+ /* Wait for a free hash link */
+ struct st_my_thread_var *thread= my_thread_var;
+ PAGECACHE_PAGE page;
+ KEYCACHE_DBUG_PRINT("get_hash_link", ("waiting"));
+ page.file= *file;
+ page.pageno= pageno;
+ thread->opt_info= (void *) &page;
+ wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread);
+ KEYCACHE_DBUG_PRINT("get_hash_link: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ thread->opt_info= NULL;
+#else
+ KEYCACHE_DBUG_ASSERT(0);
+#endif
+ DBUG_PRINT("info", ("restarting..."));
+ goto restart;
+ }
+ hash_link->file= *file;
+ hash_link->pageno= pageno;
+ link_hash(start, hash_link);
+ /* Register the request for the page */
+ hash_link->requests++;
+ }
+
+ return hash_link;
+}
+
+
+/*
+ Get a block for the file page requested by a pagecache read/write operation;
+ If the page is not in the cache return a free block, if there is none
+ return the lru block after saving its buffer if the page is dirty.
+
+ SYNOPSIS
+
+ find_block()
+ pagecache pointer to a page cache data structure
+ file handler for the file to read page from
+ pageno number of the page in the file
+ init_hits_left how initialize the block counter for the page
+ wrmode <-> get for writing
+ reg_req Register request to thye page
+ page_st out {PAGE_READ,PAGE_TO_BE_READ,PAGE_WAIT_TO_BE_READ}
+
+ RETURN VALUE
+ Pointer to the found block if successful, 0 - otherwise
+
+ NOTES.
+ For the page from file positioned at pageno the function checks whether
+ the page is in the key cache specified by the first parameter.
+ If this is the case it immediately returns the block.
+ If not, the function first chooses a block for this page. If there is
+ no not used blocks in the key cache yet, the function takes the block
+ at the very beginning of the warm sub-chain. It saves the page in that
+ block if it's dirty before returning the pointer to it.
+ The function returns in the page_st parameter the following values:
+ PAGE_READ - if page already in the block,
+ PAGE_TO_BE_READ - if it is to be read yet by the current thread
+ WAIT_TO_BE_READ - if it is to be read by another thread
+ If an error occurs THE BLOCK_ERROR bit is set in the block status.
+ It might happen that there are no blocks in LRU chain (in warm part) -
+ all blocks are unlinked for some read/write operations. Then the function
+ waits until first of this operations links any block back.
+*/
+
+static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ int init_hits_left,
+ my_bool wrmode,
+ my_bool reg_req,
+ int *page_st)
+{
+ PAGECACHE_HASH_LINK *hash_link;
+ PAGECACHE_BLOCK_LINK *block;
+ int error= 0;
+ int page_status;
+
+ DBUG_ENTER("find_block");
+ KEYCACHE_THREAD_TRACE("find_block:begin");
+ DBUG_PRINT("enter", ("fd: %d pos: %lu wrmode: %d",
+ file->file, (ulong) pageno, wrmode));
+ KEYCACHE_DBUG_PRINT("find_block", ("fd: %d pos: %lu wrmode: %d",
+ file->file, (ulong) pageno,
+ wrmode));
+#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
+ DBUG_EXECUTE("check_pagecache",
+ test_key_cache(pagecache, "start of find_block", 0););
+#endif
+
+restart:
+ /* Find the hash link for the requested page (file, pageno) */
+ hash_link= get_hash_link(pagecache, file, pageno);
+
+ page_status= -1;
+ if ((block= hash_link->block) &&
+ block->hash_link == hash_link && (block->status & BLOCK_READ))
+ page_status= PAGE_READ;
+
+ if (wrmode && pagecache->resize_in_flush)
+ {
+ /* This is a write request during the flush phase of a resize operation */
+
+ if (page_status != PAGE_READ)
+ {
+ /* We don't need the page in the cache: we are going to write on disk */
+ DBUG_ASSERT(hash_link->requests > 0);
+ hash_link->requests--;
+ unlink_hash(pagecache, hash_link);
+ return 0;
+ }
+ if (!(block->status & BLOCK_IN_FLUSH))
+ {
+ DBUG_ASSERT(hash_link->requests > 0);
+ hash_link->requests--;
+ /*
+ Remove block to invalidate the page in the block buffer
+ as we are going to write directly on disk.
+ Although we have an exclusive lock for the updated key part
+ the control can be yielded by the current thread as we might
+ have unfinished readers of other key parts in the block
+ buffer. Still we are guaranteed not to have any readers
+ of the key part we are writing into until the block is
+ removed from the cache as we set the BLOCK_REASSIGNED
+ flag (see the code below that handles reading requests).
+ */
+ free_block(pagecache, block);
+ return 0;
+ }
+ /* Wait until the page is flushed on disk */
+ DBUG_ASSERT(hash_link->requests > 0);
+ hash_link->requests--;
+ {
+#ifdef THREAD
+ struct st_my_thread_var *thread= my_thread_var;
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
+ do
+ {
+ KEYCACHE_DBUG_PRINT("find_block: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ }
+ while(thread->next);
+#else
+ KEYCACHE_DBUG_ASSERT(0);
+ /*
+ Given the use of "resize_in_flush", it seems impossible
+ that this whole branch is ever entered in single-threaded case
+ because "(wrmode && pagecache->resize_in_flush)" cannot be true.
+ TODO: Check this, and then put the whole branch into the
+ "#ifdef THREAD" guard.
+ */
+#endif
+ }
+ /* Invalidate page in the block if it has not been done yet */
+ if (block->status)
+ free_block(pagecache, block);
+ return 0;
+ }
+
+ if (page_status == PAGE_READ &&
+ (block->status & (BLOCK_IN_SWITCH | BLOCK_REASSIGNED)))
+ {
+ /* This is a request for a page to be removed from cache */
+
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("request for old page in block %u "
+ "wrmode: %d block->status: %d",
+ BLOCK_NUMBER(pagecache, block), wrmode,
+ block->status));
+ /*
+ Only reading requests can proceed until the old dirty page is flushed,
+ all others are to be suspended, then resubmitted
+ */
+ if (!wrmode && !(block->status & BLOCK_REASSIGNED))
+ {
+ if (reg_req)
+ reg_requests(pagecache, block, 1);
+ }
+ else
+ {
+ DBUG_ASSERT(hash_link->requests > 0);
+ hash_link->requests--;
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("request waiting for old page to be saved"));
+ {
+#ifdef THREAD
+ struct st_my_thread_var *thread= my_thread_var;
+ /* Put the request into the queue of those waiting for the old page */
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
+ /* Wait until the request can be resubmitted */
+ do
+ {
+ KEYCACHE_DBUG_PRINT("find_block: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ }
+ while(thread->next);
+#else
+ KEYCACHE_DBUG_ASSERT(0);
+ /* No parallel requests in single-threaded case */
+#endif
+ }
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("request for old page resubmitted"));
+ DBUG_PRINT("info", ("restarting..."));
+ /* Resubmit the request */
+ goto restart;
+ }
+ block->status&= ~BLOCK_IN_SWITCH;
+ }
+ else
+ {
+ /* This is a request for a new page or for a page not to be removed */
+ if (! block)
+ {
+ /* No block is assigned for the page yet */
+ if (pagecache->blocks_unused)
+ {
+ if (pagecache->free_block_list)
+ {
+ /* There is a block in the free list. */
+ block= pagecache->free_block_list;
+ pagecache->free_block_list= block->next_used;
+ block->next_used= NULL;
+ }
+ else
+ {
+ /* There are some never used blocks, take first of them */
+ block= &pagecache->block_root[pagecache->blocks_used];
+ block->buffer= ADD_TO_PTR(pagecache->block_mem,
+ ((ulong) pagecache->blocks_used*
+ pagecache->block_size),
+ byte*);
+ pagecache->blocks_used++;
+ }
+ pagecache->blocks_unused--;
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins == 0);
+ block->status= 0;
+#ifndef DBUG_OFF
+ block->type= PAGECACHE_EMPTY_PAGE;
+#endif
+ block->requests= 1;
+ block->temperature= BLOCK_COLD;
+ block->hits_left= init_hits_left;
+ block->last_hit_time= 0;
+ link_to_file_list(pagecache, block, file, 0);
+ block->hash_link= hash_link;
+ hash_link->block= block;
+ page_status= PAGE_TO_BE_READ;
+ DBUG_PRINT("info", ("page to be read set for page 0x%lx",
+ (ulong)block));
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("got free or never used block %u",
+ BLOCK_NUMBER(pagecache, block)));
+ }
+ else
+ {
+ /* There are no never used blocks, use a block from the LRU chain */
+
+ /*
+ Wait until a new block is added to the LRU chain;
+ several threads might wait here for the same page,
+ all of them must get the same block
+ */
+
+#ifdef THREAD
+ if (! pagecache->used_last)
+ {
+ struct st_my_thread_var *thread= my_thread_var;
+ thread->opt_info= (void *) hash_link;
+ wqueue_link_into_queue(&pagecache->waiting_for_block, thread);
+ do
+ {
+ KEYCACHE_DBUG_PRINT("find_block: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ }
+ while (thread->next);
+ thread->opt_info= NULL;
+ }
+#else
+ KEYCACHE_DBUG_ASSERT(pagecache->used_last);
+#endif
+ block= hash_link->block;
+ if (! block)
+ {
+ /*
+ Take the first block from the LRU chain
+ unlinking it from the chain
+ */
+ block= pagecache->used_last->next_used;
+ block->hits_left= init_hits_left;
+ block->last_hit_time= 0;
+ if (reg_req)
+ reg_requests(pagecache, block, 1);
+ hash_link->block= block;
+ }
+ BLOCK_INFO(block);
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins == 0);
+
+ if (block->hash_link != hash_link &&
+ ! (block->status & BLOCK_IN_SWITCH) )
+ {
+ /* this is a primary request for a new page */
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins == 0);
+ block->status|= (BLOCK_IN_SWITCH | BLOCK_WRLOCK);
+
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("got block %u for new page",
+ BLOCK_NUMBER(pagecache, block)));
+
+ if (block->status & BLOCK_CHANGED)
+ {
+ /* The block contains a dirty page - push it out of the cache */
+
+ KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ /*
+ The call is thread safe because only the current
+ thread might change the block->hash_link value
+ */
+ DBUG_ASSERT(block->pins == 0);
+ error= pagecache_fwrite(pagecache,
+ &block->hash_link->file,
+ block->buffer,
+ block->hash_link->pageno,
+ block->type,
+ MYF(MY_NABP | MY_WAIT_IF_FULL));
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ pagecache->global_cache_write++;
+ }
+
+ block->status|= BLOCK_REASSIGNED;
+ if (block->hash_link)
+ {
+ /*
+ Wait until all pending read requests
+ for this page are executed
+ (we could have avoided this waiting, if we had read
+ a page in the cache in a sweep, without yielding control)
+ */
+ wait_for_readers(pagecache, block);
+
+ /* Remove the hash link for this page from the hash table */
+ unlink_hash(pagecache, block->hash_link);
+ /* All pending requests for this page must be resubmitted */
+#ifdef THREAD
+ if (block->wqueue[COND_FOR_SAVED].last_thread)
+ wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
+#endif
+ }
+ link_to_file_list(pagecache, block, file,
+ (my_bool)(block->hash_link ? 1 : 0));
+ BLOCK_INFO(block);
+ block->status= error? BLOCK_ERROR : 0;
+#ifndef DBUG_OFF
+ block->type= PAGECACHE_EMPTY_PAGE;
+#endif
+ block->hash_link= hash_link;
+ page_status= PAGE_TO_BE_READ;
+ DBUG_PRINT("info", ("page to be read set for page 0x%lx",
+ (ulong)block));
+
+ KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
+ KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
+ }
+ else
+ {
+ /* This is for secondary requests for a new page only */
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("block->hash_link: %p hash_link: %p "
+ "block->status: %u", block->hash_link,
+ hash_link, block->status ));
+ page_status= (((block->hash_link == hash_link) &&
+ (block->status & BLOCK_READ)) ?
+ PAGE_READ : PAGE_WAIT_TO_BE_READ);
+ }
+ }
+ pagecache->global_cache_read++;
+ }
+ else
+ {
+ if (reg_req)
+ reg_requests(pagecache, block, 1);
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("block->hash_link: %p hash_link: %p "
+ "block->status: %u", block->hash_link,
+ hash_link, block->status ));
+ page_status= (((block->hash_link == hash_link) &&
+ (block->status & BLOCK_READ)) ?
+ PAGE_READ : PAGE_WAIT_TO_BE_READ);
+ }
+ }
+
+ KEYCACHE_DBUG_ASSERT(page_status != -1);
+ *page_st= page_status;
+ DBUG_PRINT("info",
+ ("block: 0x%lx fd: %u pos %lu block->status %u page_status %u",
+ (ulong) block, (uint) file->file,
+ (ulong) pageno, block->status, (uint) page_status));
+ KEYCACHE_DBUG_PRINT("find_block",
+ ("block: 0x%lx fd: %d pos: %lu block->status: %u page_status: %d",
+ (ulong) block,
+ file->file, (ulong) pageno, block->status,
+ page_status));
+
+#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
+ DBUG_EXECUTE("check_pagecache",
+ test_key_cache(pagecache, "end of find_block",0););
+#endif
+ KEYCACHE_THREAD_TRACE("find_block:end");
+ DBUG_RETURN(block);
+}
+
+
+static void add_pin(PAGECACHE_BLOCK_LINK *block)
+{
+ DBUG_ENTER("add_pin");
+ DBUG_PRINT("enter", ("block 0x%lx pins: %u",
+ (ulong) block,
+ block->pins));
+ BLOCK_INFO(block);
+ block->pins++;
+#ifdef PAGECACHE_DEBUG
+ {
+ PAGECACHE_PIN_INFO *info=
+ (PAGECACHE_PIN_INFO *)my_malloc(sizeof(PAGECACHE_PIN_INFO), MYF(0));
+ info->thread= my_thread_var;
+ info_link(&block->pin_list, info);
+ }
+#endif
+ DBUG_VOID_RETURN;
+}
+
+static void remove_pin(PAGECACHE_BLOCK_LINK *block)
+{
+ DBUG_ENTER("remove_pin");
+ DBUG_PRINT("enter", ("block 0x%lx pins: %u",
+ (ulong) block,
+ block->pins));
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block->pins > 0);
+ block->pins--;
+#ifdef PAGECACHE_DEBUG
+ {
+ PAGECACHE_PIN_INFO *info= info_find(block->pin_list, my_thread_var);
+ DBUG_ASSERT(info != 0);
+ info_unlink(info);
+ my_free((gptr) info, MYF(0));
+ }
+#endif
+ DBUG_VOID_RETURN;
+}
+#ifdef PAGECACHE_DEBUG
+static void info_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
+{
+ PAGECACHE_LOCK_INFO *info=
+ (PAGECACHE_LOCK_INFO *)my_malloc(sizeof(PAGECACHE_LOCK_INFO), MYF(0));
+ info->thread= my_thread_var;
+ info->write_lock= wl;
+ info_link((PAGECACHE_PIN_INFO **)&block->lock_list,
+ (PAGECACHE_PIN_INFO *)info);
+}
+static void info_remove_lock(PAGECACHE_BLOCK_LINK *block)
+{
+ PAGECACHE_LOCK_INFO *info=
+ (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
+ my_thread_var);
+ DBUG_ASSERT(info != 0);
+ info_unlink((PAGECACHE_PIN_INFO *)info);
+ my_free((gptr)info, MYF(0));
+}
+static void info_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
+{
+ PAGECACHE_LOCK_INFO *info=
+ (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
+ my_thread_var);
+ DBUG_ASSERT(info != 0 && info->write_lock != wl);
+ info->write_lock= wl;
+}
+#else
+#define info_add_lock(B,W)
+#define info_remove_lock(B)
+#define info_change_lock(B,W)
+#endif
+
+/*
+ Put on the block write lock
+
+ SYNOPSIS
+ get_wrlock()
+ pagecache pointer to a page cache data structure
+ block the block to work with
+
+ RETURN
+ 0 - OK
+ 1 - Can't lock this block, need retry
+*/
+
+static my_bool get_wrlock(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block)
+{
+ PAGECACHE_FILE file= block->hash_link->file;
+ pgcache_page_no_t pageno= block->hash_link->pageno;
+ DBUG_ENTER("get_wrlock");
+ DBUG_PRINT("info", ("the block 0x%lx "
+ "files %d(%d) pages %d(%d)",
+ (ulong)block,
+ file.file, block->hash_link->file.file,
+ pageno, block->hash_link->pageno));
+ BLOCK_INFO(block);
+ while (block->status & BLOCK_WRLOCK)
+ {
+ /* Lock failed we will wait */
+#ifdef THREAD
+ struct st_my_thread_var *thread= my_thread_var;
+ DBUG_PRINT("info", ("fail to lock, waiting... 0x%lx", (ulong)block));
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread);
+ dec_counter_for_resize_op(pagecache);
+ do
+ {
+ KEYCACHE_DBUG_PRINT("get_wrlock: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ }
+ while(thread->next);
+#else
+ DBUG_ASSERT(0);
+#endif
+ BLOCK_INFO(block);
+ if ((block->status & (BLOCK_REASSIGNED | BLOCK_IN_SWITCH)) ||
+ file.file != block->hash_link->file.file ||
+ pageno != block->hash_link->pageno)
+ {
+ DBUG_PRINT("info", ("the block 0x%lx changed => need retry"
+ "status %x files %d != %d or pages %d !=%d",
+ (ulong)block, block->status,
+ file.file, block->hash_link->file.file,
+ pageno, block->hash_link->pageno));
+ DBUG_RETURN(1);
+ }
+ }
+ DBUG_ASSERT(block->pins == 0);
+ /* we are doing it by global cache mutex protection, so it is OK */
+ block->status|= BLOCK_WRLOCK;
+ DBUG_PRINT("info", ("WR lock set, block 0x%lx", (ulong)block));
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Remove write lock from the block
+
+ SYNOPSIS
+ release_wrlock()
+ pagecache pointer to a page cache data structure
+ block the block to work with
+
+ RETURN
+ 0 - OK
+*/
+
+static void release_wrlock(PAGECACHE_BLOCK_LINK *block)
+{
+ DBUG_ENTER("release_wrlock");
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block->status & BLOCK_WRLOCK);
+ DBUG_ASSERT(block->pins > 0);
+ block->status&= ~BLOCK_WRLOCK;
+ DBUG_PRINT("info", ("WR lock reset, block 0x%lx", (ulong)block));
+#ifdef THREAD
+ /* release all threads waiting for write lock */
+ if (block->wqueue[COND_FOR_WRLOCK].last_thread)
+ wqueue_release_queue(&block->wqueue[COND_FOR_WRLOCK]);
+#endif
+ BLOCK_INFO(block);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Try to lock/unlock and pin/unpin the block
+
+ SYNOPSIS
+ make_lock_and_pin()
+ pagecache pointer to a page cache data structure
+ block the block to work with
+ lock lock change mode
+ pin pinchange mode
+
+ RETURN
+ 0 - OK
+ 1 - Try to lock the block failed
+*/
+
+static my_bool make_lock_and_pin(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin)
+{
+ DBUG_ENTER("make_lock_and_pin");
+ DBUG_PRINT("enter", ("block: 0x%lx (%u), wrlock: %c pins: %u, lock %s, pin: %s",
+ (ulong)block, BLOCK_NUMBER(pagecache, block),
+ ((block->status & BLOCK_WRLOCK)?'Y':'N'),
+ block->pins,
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin]));
+ BLOCK_INFO(block);
+#ifdef PAGECACHE_DEBUG
+ DBUG_ASSERT(info_check_pin(block, pin) == 0 &&
+ info_check_lock(block, lock, pin) == 0);
+#endif
+ switch (lock)
+ {
+ case PAGECACHE_LOCK_WRITE: /* free -> write */
+ /* Writelock and pin the buffer */
+ if (get_wrlock(pagecache, block))
+ {
+ /* can't lock => need retry */
+ goto retry;
+ }
+
+ /* The cache is locked so nothing afraid of */
+ add_pin(block);
+ info_add_lock(block, 1);
+ break;
+ case PAGECACHE_LOCK_WRITE_TO_READ: /* write -> read */
+ case PAGECACHE_LOCK_WRITE_UNLOCK: /* write -> free */
+ /*
+ Removes write lock and puts read lock (which is nothing in our
+ implementation)
+ */
+ release_wrlock(block);
+ case PAGECACHE_LOCK_READ_UNLOCK: /* read -> free */
+ case PAGECACHE_LOCK_LEFT_READLOCKED: /* read -> read */
+ if (pin == PAGECACHE_UNPIN)
+ {
+ remove_pin(block);
+ }
+ if (lock == PAGECACHE_LOCK_WRITE_TO_READ)
+ {
+ info_change_lock(block, 0);
+ }
+ else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
+ lock == PAGECACHE_LOCK_READ_UNLOCK)
+ {
+ info_remove_lock(block);
+ }
+ break;
+ case PAGECACHE_LOCK_READ: /* free -> read */
+ if (pin == PAGECACHE_PIN)
+ {
+ /* The cache is locked so nothing afraid off */
+ add_pin(block);
+ }
+ info_add_lock(block, 0);
+ break;
+ case PAGECACHE_LOCK_LEFT_UNLOCKED: /* free -> free */
+ case PAGECACHE_LOCK_LEFT_WRITELOCKED: /* write -> write */
+ break; /* do nothing */
+ default:
+ DBUG_ASSERT(0); /* Never should happened */
+ }
+
+ BLOCK_INFO(block);
+ DBUG_RETURN(0);
+retry:
+ DBUG_PRINT("INFO", ("Retry block 0x%lx", (ulong)block));
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block->hash_link->requests > 0);
+ block->hash_link->requests--;
+ DBUG_ASSERT(block->requests > 0);
+ unreg_request(pagecache, block, 1);
+ BLOCK_INFO(block);
+ DBUG_RETURN(1);
+
+}
+
+
+/*
+ Read into a key cache block buffer from disk.
+
+ SYNOPSIS
+
+ read_block()
+ pagecache pointer to a page cache data structure
+ block block to which buffer the data is to be read
+ primary <-> the current thread will read the data
+ validator validator of read from the disk data
+ validator_data pointer to the data need by the validator
+
+ RETURN VALUE
+ None
+
+ NOTES.
+ The function either reads a page data from file to the block buffer,
+ or waits until another thread reads it. What page to read is determined
+ by a block parameter - reference to a hash link for this page.
+ If an error occurs THE BLOCK_ERROR bit is set in the block status.
+*/
+
+static void read_block(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block,
+ my_bool primary,
+ pagecache_disk_read_validator validator,
+ gptr validator_data)
+{
+ uint got_length;
+
+ /* On entry cache_lock is locked */
+
+ DBUG_ENTER("read_block");
+ if (primary)
+ {
+ /*
+ This code is executed only by threads
+ that submitted primary requests
+ */
+
+ DBUG_PRINT("read_block",
+ ("page to be read by primary request"));
+
+ /* Page is not in buffer yet, is to be read from disk */
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ /*
+ Here other threads may step in and register as secondary readers.
+ They will register in block->wqueue[COND_FOR_REQUESTED].
+ */
+ got_length= pagecache_fread(pagecache, &block->hash_link->file,
+ block->buffer,
+ block->hash_link->pageno, MYF(0));
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ if (got_length < pagecache->block_size)
+ block->status|= BLOCK_ERROR;
+ else
+ block->status= (BLOCK_READ | (block->status & BLOCK_WRLOCK));
+
+ if (validator != NULL &&
+ (*validator)(block->buffer, validator_data))
+ block->status|= BLOCK_ERROR;
+
+ DBUG_PRINT("read_block",
+ ("primary request: new page in cache"));
+ /* Signal that all pending requests for this page now can be processed */
+#ifdef THREAD
+ if (block->wqueue[COND_FOR_REQUESTED].last_thread)
+ wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
+#endif
+ }
+ else
+ {
+ /*
+ This code is executed only by threads
+ that submitted secondary requests
+ */
+ DBUG_PRINT("read_block",
+ ("secondary request waiting for new page to be read"));
+ {
+#ifdef THREAD
+ struct st_my_thread_var *thread= my_thread_var;
+ /* Put the request into a queue and wait until it can be processed */
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
+ do
+ {
+ DBUG_PRINT("read_block: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ }
+ while (thread->next);
+#else
+ KEYCACHE_DBUG_ASSERT(0);
+ /* No parallel requests in single-threaded case */
+#endif
+ }
+ DBUG_PRINT("read_block",
+ ("secondary request: new page in cache"));
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Unlock/unpin page and put LSN stamp if it need
+
+ SYNOPSIS
+ pagecache_unlock_page()
+ pagecache pointer to a page cache data structure
+ file handler for the file for the block of data to be read
+ pageno number of the block of data in the file
+ lock lock change
+ pin pin page
+ first_REDO_LSN_for_page do not set it if it is zero
+
+ NOTE
+ Pininig uses requests registration mechanism it works following way:
+ | beginnig | ending |
+ | of func. | of func. |
+ ----------------------------+-------------+---------------+
+ PAGECACHE_PIN_LEFT_PINNED | - | - |
+ PAGECACHE_PIN_LEFT_UNPINNED | reg request | unreg request |
+ PAGECACHE_PIN | reg request | - |
+ PAGECACHE_UNPIN | - | unreg request |
+
+
+*/
+
+void pagecache_unlock_page(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin,
+ LSN first_REDO_LSN_for_page)
+{
+ PAGECACHE_BLOCK_LINK *block;
+ int page_st;
+ DBUG_ENTER("pagecache_unlock_page");
+ DBUG_PRINT("enter", ("fd: %u page: %lu l%s p%s",
+ (uint) file->file, (ulong) pageno,
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin]));
+ /* we do not allow any lock/pin increasing here */
+ DBUG_ASSERT(pin != PAGECACHE_PIN &&
+ lock != PAGECACHE_LOCK_READ &&
+ lock != PAGECACHE_LOCK_WRITE);
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ /*
+ As soon as we keep lock cache can be used, and we have lock because want
+ to unlock.
+ */
+ DBUG_ASSERT(pagecache->can_be_used);
+
+ inc_counter_for_resize_op(pagecache);
+ /* See NOTE for pagecache_unlock_page about registering requests */
+ block= find_block(pagecache, file, pageno, 0, 0,
+ test(pin == PAGECACHE_PIN_LEFT_UNPINNED), &page_st);
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
+ if (first_REDO_LSN_for_page)
+ {
+ DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK &&
+ pin == PAGECACHE_UNPIN);
+ set_if_bigger(block->rec_lsn, first_REDO_LSN_for_page);
+ }
+
+#ifndef DBUG_OFF
+ if (
+#endif
+ make_lock_and_pin(pagecache, block, lock, pin)
+#ifndef DBUG_OFF
+ )
+ {
+ DBUG_ASSERT(0); /* should not happend */
+ }
+#else
+ ;
+#endif
+
+ remove_reader(block);
+ /*
+ Link the block into the LRU chain if it's the last submitted request
+ for the block and block will not be pinned.
+ See NOTE for pagecache_unlock_page about registering requests.
+ */
+ if (pin != PAGECACHE_PIN_LEFT_PINNED)
+ unreg_request(pagecache, block, 1);
+
+ dec_counter_for_resize_op(pagecache);
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Unpin page
+
+ SYNOPSIS
+ pagecache_unpin_page()
+ pagecache pointer to a page cache data structure
+ file handler for the file for the block of data to be read
+ pageno number of the block of data in the file
+*/
+
+void pagecache_unpin_page(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno)
+{
+ PAGECACHE_BLOCK_LINK *block;
+ int page_st;
+ DBUG_ENTER("pagecache_unpin_page");
+ DBUG_PRINT("enter", ("fd: %u page: %lu",
+ (uint) file->file, (ulong) pageno));
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ /*
+ As soon as we keep lock cache can be used, and we have lock bacause want
+ aunlock.
+ */
+ DBUG_ASSERT(pagecache->can_be_used);
+
+ inc_counter_for_resize_op(pagecache);
+ /* See NOTE for pagecache_unlock_page about registering requests */
+ block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st);
+ DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
+
+#ifndef DBUG_OFF
+ if (
+#endif
+ /*
+ we can just unpin only with keeping read lock because:
+ a) we can't pin without any lock
+ b) we can't unpin keeping write lock
+ */
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_LEFT_READLOCKED,
+ PAGECACHE_UNPIN)
+#ifndef DBUG_OFF
+ )
+ {
+ DBUG_ASSERT(0); /* should not happend */
+ }
+#else
+ ;
+#endif
+
+ remove_reader(block);
+ /*
+ Link the block into the LRU chain if it's the last submitted request
+ for the block and block will not be pinned.
+ See NOTE for pagecache_unlock_page about registering requests
+ */
+ unreg_request(pagecache, block, 1);
+
+ dec_counter_for_resize_op(pagecache);
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Unlock/unpin page and put LSN stamp if it need
+ (uses direct block/page pointer)
+
+ SYNOPSIS
+ pagecache_unlock()
+ pagecache pointer to a page cache data structure
+ link direct link to page (returned by read or write)
+ lock lock change
+ pin pin page
+ first_REDO_LSN_for_page do not set it if it is zero
+*/
+
+void pagecache_unlock(PAGECACHE *pagecache,
+ PAGECACHE_PAGE_LINK *link,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin,
+ LSN first_REDO_LSN_for_page)
+{
+ PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link;
+ DBUG_ENTER("pagecache_unlock");
+ DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu l%s p%s",
+ (ulong) block,
+ (uint) block->hash_link->file.file,
+ (ulong) block->hash_link->pageno,
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin]));
+ /*
+ We do not allow any lock/pin increasing here and page can't be
+ unpinned because we use direct link.
+ */
+ DBUG_ASSERT(pin != PAGECACHE_PIN &&
+ pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ lock != PAGECACHE_LOCK_READ &&
+ lock != PAGECACHE_LOCK_WRITE);
+ if (pin == PAGECACHE_PIN_LEFT_UNPINNED &&
+ lock == PAGECACHE_LOCK_READ_UNLOCK)
+ {
+#ifndef DBUG_OFF
+ if (
+#endif
+ /* block do not need here so we do not provide it */
+ make_lock_and_pin(pagecache, 0, lock, pin)
+#ifndef DBUG_OFF
+ )
+ {
+ DBUG_ASSERT(0); /* should not happend */
+ }
+#else
+ ;
+#endif
+ DBUG_VOID_RETURN;
+ }
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ /*
+ As soon as we keep lock cache can be used, and we have lock bacause want
+ aunlock.
+ */
+ DBUG_ASSERT(pagecache->can_be_used);
+
+ inc_counter_for_resize_op(pagecache);
+ if (first_REDO_LSN_for_page)
+ {
+ DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK &&
+ pin == PAGECACHE_UNPIN);
+ set_if_bigger(block->rec_lsn, first_REDO_LSN_for_page);
+ }
+
+#ifndef DBUG_OFF
+ if (
+#endif
+ make_lock_and_pin(pagecache, block, lock, pin)
+#ifndef DBUG_OFF
+ )
+ {
+ DBUG_ASSERT(0); /* should not happend */
+ }
+#else
+ ;
+#endif
+
+ remove_reader(block);
+ /*
+ Link the block into the LRU chain if it's the last submitted request
+ for the block and block will not be pinned.
+ See NOTE for pagecache_unlock_page about registering requests.
+ */
+ if (pin != PAGECACHE_PIN_LEFT_PINNED)
+ unreg_request(pagecache, block, 1);
+
+ dec_counter_for_resize_op(pagecache);
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Unpin page
+ (uses direct block/page pointer)
+
+ SYNOPSIS
+ pagecache_unpin_page()
+ pagecache pointer to a page cache data structure
+ link direct link to page (returned by read or write)
+*/
+
+void pagecache_unpin(PAGECACHE *pagecache,
+ PAGECACHE_PAGE_LINK *link)
+{
+ PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link;
+ DBUG_ENTER("pagecache_unpin");
+ DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu",
+ (ulong) block,
+ (uint) block->hash_link->file.file,
+ (ulong) block->hash_link->pageno));
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ /*
+ As soon as we keep lock cache can be used, and we have lock bacause want
+ aunlock.
+ */
+ DBUG_ASSERT(pagecache->can_be_used);
+
+ inc_counter_for_resize_op(pagecache);
+
+#ifndef DBUG_OFF
+ if (
+#endif
+ /*
+ we can just unpin only with keeping read lock because:
+ a) we can't pin without any lock
+ b) we can't unpin keeping write lock
+ */
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_LEFT_READLOCKED,
+ PAGECACHE_UNPIN)
+#ifndef DBUG_OFF
+ )
+ {
+ DBUG_ASSERT(0); /* should not happend */
+ }
+#else
+ ;
+#endif
+
+ remove_reader(block);
+ /*
+ Link the block into the LRU chain if it's the last submitted request
+ for the block and block will not be pinned.
+ See NOTE for pagecache_unlock_page about registering requests.
+ */
+ unreg_request(pagecache, block, 1);
+
+ dec_counter_for_resize_op(pagecache);
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Read a block of data from a cached file into a buffer;
+
+ SYNOPSIS
+ pagecache_valid_read()
+ pagecache pointer to a page cache data structure
+ file handler for the file for the block of data to be read
+ pageno number of the block of data in the file
+ level determines the weight of the data
+ buff buffer to where the data must be placed
+ type type of the page
+ lock lock change
+ link link to the page if we pin it
+ validator validator of read from the disk data
+ validator_data pointer to the data need by the validator
+
+ RETURN VALUE
+ Returns address from where the data is placed if sucessful, 0 - otherwise.
+
+ Pin will be choosen according to lock parameter (see lock_to_pin)
+*/
+static enum pagecache_page_pin lock_to_pin[]=
+{
+ PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
+ PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
+ PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
+ PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/,
+ PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
+ PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
+ PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
+ PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/
+};
+
+byte *pagecache_valid_read(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ uint level,
+ byte *buff,
+ enum pagecache_page_type type,
+ enum pagecache_page_lock lock,
+ PAGECACHE_PAGE_LINK *link,
+ pagecache_disk_read_validator validator,
+ gptr validator_data)
+{
+ int error= 0;
+ enum pagecache_page_pin pin= lock_to_pin[lock];
+ PAGECACHE_PAGE_LINK fake_link;
+ DBUG_ENTER("pagecache_valid_read");
+ DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s l%s p%s",
+ (uint) file->file, (ulong) pageno, level,
+ page_cache_page_type_str[type],
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin]));
+
+ if (!link)
+ link= &fake_link;
+ else
+ *link= 0;
+
+restart:
+
+ if (pagecache->can_be_used)
+ {
+ /* Key cache is used */
+ PAGECACHE_BLOCK_LINK *block;
+ uint status;
+ int page_st;
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ if (!pagecache->can_be_used)
+ {
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ goto no_key_cache;
+ }
+
+ inc_counter_for_resize_op(pagecache);
+ pagecache->global_cache_r_requests++;
+ /* See NOTE for pagecache_unlock_page about registering requests. */
+ block= find_block(pagecache, file, pageno, level,
+ test(lock == PAGECACHE_LOCK_WRITE),
+ test((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
+ (pin == PAGECACHE_PIN)),
+ &page_st);
+ DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
+ block->type == type);
+ block->type= type;
+ if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
+ {
+ DBUG_PRINT("info", ("read block 0x%lx", (ulong)block));
+ /* The requested page is to be read into the block buffer */
+ read_block(pagecache, block,
+ (my_bool)(page_st == PAGE_TO_BE_READ),
+ validator, validator_data);
+ DBUG_PRINT("info", ("read is done"));
+ }
+ if (make_lock_and_pin(pagecache, block, lock, pin))
+ {
+ /*
+ We failed to write lock the block, cache is unlocked,
+ we will try to get the block again.
+ */
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("restarting..."));
+ goto restart;
+ }
+
+ if (! ((status= block->status) & BLOCK_ERROR))
+ {
+#if !defined(SERIALIZED_READ_FROM_CACHE)
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+#endif
+
+ DBUG_ASSERT((pagecache->block_size & 511) == 0);
+ /* Copy data from the cache buffer */
+ bmove512(buff, block->buffer, pagecache->block_size);
+
+#if !defined(SERIALIZED_READ_FROM_CACHE)
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+#endif
+ }
+
+ remove_reader(block);
+ /*
+ Link the block into the LRU chain if it's the last submitted request
+ for the block and block will not be pinned.
+ See NOTE for pagecache_unlock_page about registering requests.
+ */
+ if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
+ unreg_request(pagecache, block, 1);
+ else
+ *link= (PAGECACHE_PAGE_LINK)block;
+
+ dec_counter_for_resize_op(pagecache);
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+
+ if (status & BLOCK_ERROR)
+ DBUG_RETURN((byte *) 0);
+
+ DBUG_RETURN(buff);
+ }
+
+no_key_cache: /* Key cache is not used */
+
+ /* We can't use mutex here as the key cache may not be initialized */
+ pagecache->global_cache_r_requests++;
+ pagecache->global_cache_read++;
+ if (pagecache_fread(pagecache, file, (byte*) buff, pageno, MYF(MY_NABP)))
+ error= 1;
+ DBUG_RETURN(error ? (byte*) 0 : buff);
+}
+
+
+/*
+ Delete page from the buffer
+
+ SYNOPSIS
+ pagecache_delete_page()
+ pagecache pointer to a page cache data structure
+ file handler for the file for the block of data to be read
+ pageno number of the block of data in the file
+ lock lock change
+ flush flush page if it is dirty
+
+ RETURN VALUE
+ 0 - deleted or was not present at all
+ 1 - error
+
+ NOTES.
+ lock can be only PAGECACHE_LOCK_LEFT_WRITELOCKED (page was write locked
+ before) or PAGECACHE_LOCK_WRITE (delete will write lock page before delete)
+*/
+my_bool pagecache_delete_page(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ enum pagecache_page_lock lock,
+ my_bool flush)
+{
+ int error= 0;
+ enum pagecache_page_pin pin= lock_to_pin[lock];
+ DBUG_ENTER("pagecache_delete_page");
+ DBUG_PRINT("enter", ("fd: %u page: %lu l%s p%s",
+ (uint) file->file, (ulong) pageno,
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin]));
+ DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE ||
+ lock == PAGECACHE_LOCK_LEFT_WRITELOCKED);
+ DBUG_ASSERT(pin == PAGECACHE_PIN ||
+ pin == PAGECACHE_PIN_LEFT_PINNED);
+
+restart:
+
+ if (pagecache->can_be_used)
+ {
+ /* Key cache is used */
+ reg1 PAGECACHE_BLOCK_LINK *block;
+ PAGECACHE_HASH_LINK **unused_start, *link;
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ if (!pagecache->can_be_used)
+ goto end;
+
+ inc_counter_for_resize_op(pagecache);
+ link= get_present_hash_link(pagecache, file, pageno, &unused_start);
+ if (!link)
+ {
+ DBUG_PRINT("info", ("There is no such page in the cache"));
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_RETURN(0);
+ }
+ block= link->block;
+ /* See NOTE for pagecache_unlock_page about registering requests. */
+ if (pin == PAGECACHE_PIN)
+ reg_requests(pagecache, block, 1);
+ DBUG_ASSERT(block != 0);
+ if (make_lock_and_pin(pagecache, block, lock, pin))
+ {
+ /*
+ We failed to writelock the block, cache is unlocked, and last write
+ lock is released, we will try to get the block again.
+ */
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("restarting..."));
+ goto restart;
+ }
+
+ if (block->status & BLOCK_CHANGED)
+ {
+ if (flush)
+ {
+ /* The block contains a dirty page - push it out of the cache */
+
+ KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ /*
+ The call is thread safe because only the current
+ thread might change the block->hash_link value
+ */
+ DBUG_ASSERT(block->pins == 1);
+ error= pagecache_fwrite(pagecache,
+ &block->hash_link->file,
+ block->buffer,
+ block->hash_link->pageno,
+ block->type,
+ MYF(MY_NABP | MY_WAIT_IF_FULL));
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ pagecache->global_cache_write++;
+
+ if (error)
+ {
+ block->status|= BLOCK_ERROR;
+ goto err;
+ }
+ }
+ pagecache->blocks_changed--;
+ pagecache->global_blocks_changed--;
+ /*
+ free_block() will change the status and rec_lsn of the block so no
+ need to change them here.
+ */
+ }
+ /* Cache is locked, so we can relese page before freeing it */
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_WRITE_UNLOCK,
+ PAGECACHE_UNPIN);
+ DBUG_ASSERT(link->requests > 0);
+ link->requests--;
+ /* See NOTE for pagecache_unlock_page about registering requests. */
+ free_block(pagecache, block);
+
+err:
+ dec_counter_for_resize_op(pagecache);
+end:
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+/*
+ Write a buffer into a cached file.
+
+ SYNOPSIS
+
+ pagecache_write()
+ pagecache pointer to a page cache data structure
+ file handler for the file to write data to
+ pageno number of the block of data in the file
+ level determines the weight of the data
+ buff buffer to where the data must be placed
+ type type of the page
+ lock lock change
+ pin pin page
+ write_mode how to write page
+ link link to the page if we pin it
+
+ RETURN VALUE
+ 0 if a success, 1 - otherwise.
+*/
+
+/* description of how to change lock before and after write */
+struct write_lock_change
+{
+ int need_lock_change; /* need changing of lock at the end of write */
+ enum pagecache_page_lock new_lock; /* lock at the beginning */
+ enum pagecache_page_lock unlock_lock; /* lock at the end */
+};
+
+static struct write_lock_change write_lock_change_table[]=
+{
+ {1,
+ PAGECACHE_LOCK_WRITE,
+ PAGECACHE_LOCK_WRITE_UNLOCK} /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
+ {0, /*unsupported*/
+ PAGECACHE_LOCK_LEFT_UNLOCKED,
+ PAGECACHE_LOCK_LEFT_UNLOCKED} /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
+ {0, PAGECACHE_LOCK_LEFT_WRITELOCKED, 0} /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
+ {1,
+ PAGECACHE_LOCK_WRITE,
+ PAGECACHE_LOCK_WRITE_TO_READ} /*PAGECACHE_LOCK_READ*/,
+ {0, PAGECACHE_LOCK_WRITE, 0} /*PAGECACHE_LOCK_WRITE*/,
+ {0, /*unsupported*/
+ PAGECACHE_LOCK_LEFT_UNLOCKED,
+ PAGECACHE_LOCK_LEFT_UNLOCKED} /*PAGECACHE_LOCK_READ_UNLOCK*/,
+ {1,
+ PAGECACHE_LOCK_LEFT_WRITELOCKED,
+ PAGECACHE_LOCK_WRITE_UNLOCK } /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
+ {1,
+ PAGECACHE_LOCK_LEFT_WRITELOCKED,
+ PAGECACHE_LOCK_WRITE_TO_READ}/*PAGECACHE_LOCK_WRITE_TO_READ*/
+};
+
+/* description of how to change pin before and after write */
+struct write_pin_change
+{
+ enum pagecache_page_pin new_pin; /* pin status at the beginning */
+ enum pagecache_page_pin unlock_pin; /* pin status at the end */
+};
+
+static struct write_pin_change write_pin_change_table[]=
+{
+ {PAGECACHE_PIN_LEFT_PINNED,
+ PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN_LEFT_PINNED*/,
+ {PAGECACHE_PIN,
+ PAGECACHE_UNPIN} /*PAGECACHE_PIN_LEFT_UNPINNED*/,
+ {PAGECACHE_PIN,
+ PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN*/,
+ {PAGECACHE_PIN_LEFT_PINNED,
+ PAGECACHE_UNPIN} /*PAGECACHE_UNPIN*/
+};
+
+my_bool pagecache_write(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ uint level,
+ byte *buff,
+ enum pagecache_page_type type,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin,
+ enum pagecache_write_mode write_mode,
+ PAGECACHE_PAGE_LINK *link)
+{
+ reg1 PAGECACHE_BLOCK_LINK *block= NULL;
+ PAGECACHE_PAGE_LINK fake_link;
+ int error= 0;
+ int need_lock_change= write_lock_change_table[lock].need_lock_change;
+ DBUG_ENTER("pagecache_write");
+ DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s l%s p%s m%s",
+ (uint) file->file, (ulong) pageno, level,
+ page_cache_page_type_str[type],
+ page_cache_page_lock_str[lock],
+ page_cache_page_pin_str[pin],
+ page_cache_page_write_mode_str[write_mode]));
+ DBUG_ASSERT(lock != PAGECACHE_LOCK_LEFT_READLOCKED &&
+ lock != PAGECACHE_LOCK_READ_UNLOCK);
+ if (!link)
+ link= &fake_link;
+ else
+ *link= 0;
+
+ if (write_mode == PAGECACHE_WRITE_NOW)
+ {
+ /* we allow direct write if we do not use long term lockings */
+ DBUG_ASSERT(lock == PAGECACHE_LOCK_LEFT_UNLOCKED);
+ /* Force writing from buff into disk */
+ pagecache->global_cache_write++;
+ if (pagecache_fwrite(pagecache, file, buff, pageno, type,
+ MYF(MY_NABP | MY_WAIT_IF_FULL)))
+ DBUG_RETURN(1);
+ }
+restart:
+
+#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
+ DBUG_EXECUTE("check_pagecache",
+ test_key_cache(pagecache, "start of key_cache_write", 1););
+#endif
+
+ if (pagecache->can_be_used)
+ {
+ /* Key cache is used */
+ int page_st;
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ if (!pagecache->can_be_used)
+ {
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ goto no_key_cache;
+ }
+
+ inc_counter_for_resize_op(pagecache);
+ pagecache->global_cache_w_requests++;
+ /* See NOTE for pagecache_unlock_page about registering requests. */
+ block= find_block(pagecache, file, pageno, level,
+ test(write_mode != PAGECACHE_WRITE_DONE &&
+ lock != PAGECACHE_LOCK_LEFT_WRITELOCKED &&
+ lock != PAGECACHE_LOCK_WRITE_UNLOCK &&
+ lock != PAGECACHE_LOCK_WRITE_TO_READ),
+ test((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
+ (pin == PAGECACHE_PIN)),
+ &page_st);
+ if (!block)
+ {
+ DBUG_ASSERT(write_mode != PAGECACHE_WRITE_DONE);
+ /* It happens only for requests submitted during resize operation */
+ dec_counter_for_resize_op(pagecache);
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ /* Write to the disk key cache is in resize at the moment*/
+ goto no_key_cache;
+ }
+
+ DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
+ block->type == type);
+ block->type= type;
+
+ if (make_lock_and_pin(pagecache, block,
+ write_lock_change_table[lock].new_lock,
+ (need_lock_change ?
+ write_pin_change_table[pin].new_pin :
+ pin)))
+ {
+ /*
+ We failed to writelock the block, cache is unlocked, and last write
+ lock is released, we will try to get the block again.
+ */
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("restarting..."));
+ goto restart;
+ }
+
+
+ if (write_mode == PAGECACHE_WRITE_DONE)
+ {
+ if ((block->status & BLOCK_ERROR) && page_st != PAGE_READ)
+ {
+ /* Copy data from buff */
+ bmove512(block->buffer, buff, pagecache->block_size);
+ block->status= (BLOCK_READ | (block->status & BLOCK_WRLOCK));
+ KEYCACHE_DBUG_PRINT("key_cache_insert",
+ ("primary request: new page in cache"));
+#ifdef THREAD
+ /* Signal that all pending requests for this now can be processed. */
+ if (block->wqueue[COND_FOR_REQUESTED].last_thread)
+ wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
+#endif
+ }
+ }
+ else
+ {
+ if (write_mode == PAGECACHE_WRITE_NOW)
+ {
+ /* buff has been written to disk at start */
+ if ((block->status & BLOCK_CHANGED) &&
+ !(block->status & BLOCK_ERROR))
+ link_to_file_list(pagecache, block, &block->hash_link->file, 1);
+ }
+ else if (! (block->status & BLOCK_CHANGED))
+ link_to_changed_list(pagecache, block);
+
+ if (! (block->status & BLOCK_ERROR))
+ {
+ bmove512(block->buffer, buff, pagecache->block_size);
+ block->status|= BLOCK_READ;
+ }
+ }
+
+
+ if (need_lock_change)
+ {
+#ifndef DBUG_OFF
+ int rc=
+#endif
+ /*
+ QQ: We are doing an unlock here, so need to give the page its rec_lsn
+ */
+ make_lock_and_pin(pagecache, block,
+ write_lock_change_table[lock].unlock_lock,
+ write_pin_change_table[pin].unlock_pin);
+#ifndef DBUG_OFF
+ DBUG_ASSERT(rc == 0);
+#endif
+ }
+
+ /* Unregister the request */
+ DBUG_ASSERT(block->hash_link->requests > 0);
+ block->hash_link->requests--;
+ /* See NOTE for pagecache_unlock_page about registering requests. */
+ if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
+ unreg_request(pagecache, block, 1);
+ else
+ *link= (PAGECACHE_PAGE_LINK)block;
+
+
+ if (block->status & BLOCK_ERROR)
+ error= 1;
+
+ dec_counter_for_resize_op(pagecache);
+
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+
+ goto end;
+ }
+
+no_key_cache:
+ /* Key cache is not used */
+ if (write_mode == PAGECACHE_WRITE_DELAY)
+ {
+ pagecache->global_cache_w_requests++;
+ pagecache->global_cache_write++;
+ if (pagecache_fwrite(pagecache, file, (byte*) buff, pageno, type,
+ MYF(MY_NABP | MY_WAIT_IF_FULL)))
+ error=1;
+ }
+
+end:
+#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
+ DBUG_EXECUTE("exec",
+ test_key_cache(pagecache, "end of key_cache_write", 1););
+#endif
+ BLOCK_INFO(block);
+ DBUG_RETURN(error);
+}
+
+
+/*
+ Free block: remove reference to it from hash table,
+ remove it from the chain file of dirty/clean blocks
+ and add it to the free list.
+*/
+
+static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
+{
+ KEYCACHE_THREAD_TRACE("free block");
+ KEYCACHE_DBUG_PRINT("free_block",
+ ("block %u to be freed, hash_link %p",
+ BLOCK_NUMBER(pagecache, block), block->hash_link));
+ if (block->hash_link)
+ {
+ /*
+ While waiting for readers to finish, new readers might request the
+ block. But since we set block->status|= BLOCK_REASSIGNED, they
+ will wait on block->wqueue[COND_FOR_SAVED]. They must be signalled
+ later.
+ */
+ block->status|= BLOCK_REASSIGNED;
+ wait_for_readers(pagecache, block);
+ unlink_hash(pagecache, block->hash_link);
+ }
+
+ unlink_changed(block);
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins == 0);
+ block->status= 0;
+#ifndef DBUG_OFF
+ block->type= PAGECACHE_EMPTY_PAGE;
+#endif
+ block->rec_lsn= 0;
+ KEYCACHE_THREAD_TRACE("free block");
+ KEYCACHE_DBUG_PRINT("free_block",
+ ("block is freed"));
+ unreg_request(pagecache, block, 0);
+ block->hash_link= NULL;
+
+ /* Remove the free block from the LRU ring. */
+ unlink_block(pagecache, block);
+ if (block->temperature == BLOCK_WARM)
+ pagecache->warm_blocks--;
+ block->temperature= BLOCK_COLD;
+ /* Insert the free block in the free list. */
+ block->next_used= pagecache->free_block_list;
+ pagecache->free_block_list= block;
+ /* Keep track of the number of currently unused blocks. */
+ pagecache->blocks_unused++;
+
+#ifdef THREAD
+ /* All pending requests for this page must be resubmitted. */
+ if (block->wqueue[COND_FOR_SAVED].last_thread)
+ wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
+#endif
+}
+
+
+static int cmp_sec_link(PAGECACHE_BLOCK_LINK **a, PAGECACHE_BLOCK_LINK **b)
+{
+ return (((*a)->hash_link->pageno < (*b)->hash_link->pageno) ? -1 :
+ ((*a)->hash_link->pageno > (*b)->hash_link->pageno) ? 1 : 0);
+}
+
+
+/*
+ Flush a portion of changed blocks to disk,
+ free used blocks if requested
+*/
+
+static int flush_cached_blocks(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ PAGECACHE_BLOCK_LINK **cache,
+ PAGECACHE_BLOCK_LINK **end,
+ enum flush_type type)
+{
+ int error;
+ int last_errno= 0;
+ uint count= (uint) (end-cache);
+ DBUG_ENTER("flush_cached_blocks");
+
+ /* Don't lock the cache during the flush */
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ /*
+ As all blocks referred in 'cache' are marked by BLOCK_IN_FLUSH
+ we are guarunteed no thread will change them
+ */
+ qsort((byte*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
+
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ for (; cache != end; cache++)
+ {
+ PAGECACHE_BLOCK_LINK *block= *cache;
+
+ if (block->pins)
+ {
+ KEYCACHE_DBUG_PRINT("flush_cached_blocks",
+ ("block %u (0x%lx) pinned",
+ BLOCK_NUMBER(pagecache, block), (ulong)block));
+ DBUG_PRINT("info", ("block %u (0x%lx) pinned",
+ BLOCK_NUMBER(pagecache, block), (ulong)block));
+ BLOCK_INFO(block);
+ last_errno= -1;
+ unreg_request(pagecache, block, 1);
+ continue;
+ }
+ /* if the block is not pinned then it is not write locked */
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins == 0);
+#ifndef DBUG_OFF
+ {
+ int rc=
+#endif
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_WRITE, PAGECACHE_PIN);
+#ifndef DBUG_OFF
+ DBUG_ASSERT(rc == 0);
+ }
+#endif
+
+ KEYCACHE_DBUG_PRINT("flush_cached_blocks",
+ ("block %u (0x%lx) to be flushed",
+ BLOCK_NUMBER(pagecache, block), (ulong)block));
+ DBUG_PRINT("info", ("block %u (0x%lx) to be flushed",
+ BLOCK_NUMBER(pagecache, block), (ulong)block));
+ BLOCK_INFO(block);
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("block %u (0x%lx) pins: %u",
+ BLOCK_NUMBER(pagecache, block), (ulong)block,
+ block->pins));
+ DBUG_ASSERT(block->pins == 1);
+ error= pagecache_fwrite(pagecache, file,
+ block->buffer,
+ block->hash_link->pageno,
+ block->type,
+ MYF(MY_NABP | MY_WAIT_IF_FULL));
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_WRITE_UNLOCK,
+ PAGECACHE_UNPIN);
+
+ pagecache->global_cache_write++;
+ if (error)
+ {
+ block->status|= BLOCK_ERROR;
+ if (!last_errno)
+ last_errno= errno ? errno : -1;
+ }
+#ifdef THREAD
+ /*
+ Let to proceed for possible waiting requests to write to the block page.
+ It might happen only during an operation to resize the key cache.
+ */
+ if (block->wqueue[COND_FOR_SAVED].last_thread)
+ wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
+#endif
+ /* type will never be FLUSH_IGNORE_CHANGED here */
+ if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
+ {
+ pagecache->blocks_changed--;
+ pagecache->global_blocks_changed--;
+ free_block(pagecache, block);
+ }
+ else
+ {
+ block->status&= ~BLOCK_IN_FLUSH;
+ link_to_file_list(pagecache, block, file, 1);
+ unreg_request(pagecache, block, 1);
+ }
+ }
+ DBUG_RETURN(last_errno);
+}
+
+
+/*
+ flush all key blocks for a file to disk, but don't do any mutex locks
+
+ flush_pagecache_blocks_int()
+ pagecache pointer to a key cache data structure
+ file handler for the file to flush to
+ flush_type type of the flush
+
+ NOTES
+ This function doesn't do any mutex locks because it needs to be called
+ both from flush_pagecache_blocks and flush_all_key_blocks (the later one
+ does the mutex lock in the resize_pagecache() function).
+
+ RETURN
+ 0 ok
+ 1 error
+*/
+
+static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ enum flush_type type)
+{
+ PAGECACHE_BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
+ int last_errno= 0;
+ DBUG_ENTER("flush_pagecache_blocks_int");
+ DBUG_PRINT("enter",("file: %d blocks_used: %lu blocks_changed: %lu",
+ file->file, pagecache->blocks_used, pagecache->blocks_changed));
+
+#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
+ DBUG_EXECUTE("check_pagecache",
+ test_key_cache(pagecache,
+ "start of flush_pagecache_blocks", 0););
+#endif
+
+ cache= cache_buff;
+ if (pagecache->disk_blocks > 0 &&
+ (!my_disable_flush_pagecache_blocks || type != FLUSH_KEEP))
+ {
+ /* Key cache exists and flush is not disabled */
+ int error= 0;
+ uint count= 0;
+ PAGECACHE_BLOCK_LINK **pos, **end;
+ PAGECACHE_BLOCK_LINK *first_in_switch= NULL;
+ PAGECACHE_BLOCK_LINK *block, *next;
+#if defined(PAGECACHE_DEBUG)
+ uint cnt= 0;
+#endif
+
+ if (type != FLUSH_IGNORE_CHANGED)
+ {
+ /*
+ Count how many key blocks we have to cache to be able
+ to flush all dirty pages with minimum seek moves
+ */
+ for (block= pagecache->changed_blocks[FILE_HASH(*file)] ;
+ block;
+ block= block->next_changed)
+ {
+ if (block->hash_link->file.file == file->file)
+ {
+ count++;
+ KEYCACHE_DBUG_ASSERT(count<= pagecache->blocks_used);
+ }
+ }
+ /* Allocate a new buffer only if its bigger than the one we have */
+ if (count > FLUSH_CACHE &&
+ !(cache=
+ (PAGECACHE_BLOCK_LINK**)
+ my_malloc(sizeof(PAGECACHE_BLOCK_LINK*)*count, MYF(0))))
+ {
+ cache= cache_buff;
+ count= FLUSH_CACHE;
+ }
+ }
+
+ /* Retrieve the blocks and write them to a buffer to be flushed */
+restart:
+ end= (pos= cache)+count;
+ for (block= pagecache->changed_blocks[FILE_HASH(*file)] ;
+ block;
+ block= next)
+ {
+#if defined(PAGECACHE_DEBUG)
+ cnt++;
+ KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
+#endif
+ next= block->next_changed;
+ if (block->hash_link->file.file == file->file)
+ {
+ /*
+ Mark the block with BLOCK_IN_FLUSH in order not to let
+ other threads to use it for new pages and interfere with
+ our sequence ot flushing dirty file pages
+ */
+ block->status|= BLOCK_IN_FLUSH;
+
+ if (! (block->status & BLOCK_IN_SWITCH))
+ {
+ /*
+ We care only for the blocks for which flushing was not
+ initiated by other threads as a result of page swapping
+ */
+ reg_requests(pagecache, block, 1);
+ if (type != FLUSH_IGNORE_CHANGED)
+ {
+ /* It's not a temporary file */
+ if (pos == end)
+ {
+ /*
+ This happens only if there is not enough
+ memory for the big block
+ */
+ if ((error= flush_cached_blocks(pagecache, file, cache,
+ end,type)))
+ last_errno=error;
+ DBUG_PRINT("info", ("restarting..."));
+ /*
+ Restart the scan as some other thread might have changed
+ the changed blocks chain: the blocks that were in switch
+ state before the flush started have to be excluded
+ */
+ goto restart;
+ }
+ *pos++= block;
+ }
+ else
+ {
+ /* It's a temporary file */
+ pagecache->blocks_changed--;
+ pagecache->global_blocks_changed--;
+ free_block(pagecache, block);
+ }
+ }
+ else
+ {
+ /* Link the block into a list of blocks 'in switch' */
+ /* QQ:
+ #warning this unlink_changed() is a serious problem for
+ Maria's Checkpoint: it removes a page from the list of dirty
+ pages, while it's still dirty. A solution is to abandon
+ first_in_switch, just wait for this page to be
+ flushed by somebody else, and loop. TODO: check all places
+ where we remove a page from the list of dirty pages
+ */
+ unlink_changed(block);
+ link_changed(block, &first_in_switch);
+ }
+ }
+ }
+ if (pos != cache)
+ {
+ if ((error= flush_cached_blocks(pagecache, file, cache, pos, type)))
+ last_errno= error;
+ }
+ /* Wait until list of blocks in switch is empty */
+ while (first_in_switch)
+ {
+#if defined(PAGECACHE_DEBUG)
+ cnt= 0;
+#endif
+ block= first_in_switch;
+ {
+#ifdef THREAD
+ struct st_my_thread_var *thread= my_thread_var;
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
+ do
+ {
+ KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait",
+ ("suspend thread %ld", thread->id));
+ pagecache_pthread_cond_wait(&thread->suspend,
+ &pagecache->cache_lock);
+ }
+ while (thread->next);
+#else
+ KEYCACHE_DBUG_ASSERT(0);
+ /* No parallel requests in single-threaded case */
+#endif
+ }
+#if defined(PAGECACHE_DEBUG)
+ cnt++;
+ KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
+#endif
+ }
+ /* The following happens very seldom */
+ if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
+ {
+#if defined(PAGECACHE_DEBUG)
+ cnt=0;
+#endif
+ for (block= pagecache->file_blocks[FILE_HASH(*file)] ;
+ block;
+ block= next)
+ {
+#if defined(PAGECACHE_DEBUG)
+ cnt++;
+ KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
+#endif
+ next= block->next_changed;
+ if (block->hash_link->file.file == file->file &&
+ (! (block->status & BLOCK_CHANGED)
+ || type == FLUSH_IGNORE_CHANGED))
+ {
+ reg_requests(pagecache, block, 1);
+ free_block(pagecache, block);
+ }
+ }
+ }
+ }
+
+#ifndef DBUG_OFF
+ DBUG_EXECUTE("check_pagecache",
+ test_key_cache(pagecache, "end of flush_pagecache_blocks", 0););
+#endif
+ if (cache != cache_buff)
+ my_free((gptr) cache, MYF(0));
+ if (last_errno)
+ errno=last_errno; /* Return first error */
+ DBUG_RETURN(last_errno != 0);
+}
+
+
+/*
+ Flush all blocks for a file to disk
+
+ SYNOPSIS
+
+ flush_pagecache_blocks()
+ pagecache pointer to a page cache data structure
+ file handler for the file to flush to
+ flush_type type of the flush
+
+ RETURN
+ 0 ok
+ 1 error
+*/
+
+int flush_pagecache_blocks(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file, enum flush_type type)
+{
+ int res;
+ DBUG_ENTER("flush_pagecache_blocks");
+ DBUG_PRINT("enter", ("pagecache: 0x%lx", (long) pagecache));
+
+ if (pagecache->disk_blocks <= 0)
+ DBUG_RETURN(0);
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+ inc_counter_for_resize_op(pagecache);
+ res= flush_pagecache_blocks_int(pagecache, file, type);
+ dec_counter_for_resize_op(pagecache);
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Reset the counters of a key cache.
+
+ SYNOPSIS
+ reset_pagecache_counters()
+ name the name of a key cache
+ pagecache pointer to the pagecache to be reset
+
+ DESCRIPTION
+ This procedure is used to reset the counters of all currently used key
+ caches, both the default one and the named ones.
+
+ RETURN
+ 0 on success (always because it can't fail)
+*/
+
+int reset_pagecache_counters(const char *name, PAGECACHE *pagecache)
+{
+ DBUG_ENTER("reset_pagecache_counters");
+ if (!pagecache->inited)
+ {
+ DBUG_PRINT("info", ("Key cache %s not initialized.", name));
+ DBUG_RETURN(0);
+ }
+ DBUG_PRINT("info", ("Resetting counters for key cache %s.", name));
+
+ pagecache->global_blocks_changed= 0; /* Key_blocks_not_flushed */
+ pagecache->global_cache_r_requests= 0; /* Key_read_requests */
+ pagecache->global_cache_read= 0; /* Key_reads */
+ pagecache->global_cache_w_requests= 0; /* Key_write_requests */
+ pagecache->global_cache_write= 0; /* Key_writes */
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Allocates a buffer and stores in it some information about all dirty pages
+ of type PAGECACHE_LSN_PAGE.
+
+ SYNOPSIS
+ pagecache_collect_changed_blocks_with_lsn()
+ pagecache pointer to the page cache
+ str (OUT) pointer to a LEX_STRING where the allocated buffer, and
+ its size, will be put
+ max_lsn (OUT) pointer to a LSN where the maximum rec_lsn of all
+ relevant dirty pages will be put
+
+ DESCRIPTION
+ Does the allocation because the caller cannot know the size itself.
+ Memory freeing is to be done by the caller (if the "str" member of the
+ LEX_STRING is not NULL).
+ Ignores all pages of another type than PAGECACHE_LSN_PAGE, because they
+ are not interesting for a checkpoint record.
+ The caller has the intention of doing checkpoints.
+
+ RETURN
+ 0 on success
+ 1 on error
+*/
+my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
+ LEX_STRING *str,
+ LSN *max_lsn)
+{
+ my_bool error;
+ ulong stored_list_size= 0;
+ uint file_hash;
+ char *ptr;
+ DBUG_ENTER("pagecache_collect_changed_blocks_with_LSN");
+
+ *max_lsn= 0;
+ DBUG_ASSERT(NULL == str->str);
+ /*
+ We lock the entire cache but will be quick, just reading/writing a few MBs
+ of memory at most.
+ When we enter here, we must be sure that no "first_in_switch" situation
+ is happening or will happen (either we have to get rid of
+ first_in_switch in the code or, first_in_switch has to increment a
+ "danger" counter for this function to know it has to wait). TODO.
+ */
+ pagecache_pthread_mutex_lock(&pagecache->cache_lock);
+
+ /* Count how many dirty pages are interesting */
+ for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
+ {
+ PAGECACHE_BLOCK_LINK *block;
+ for (block= pagecache->changed_blocks[file_hash] ;
+ block;
+ block= block->next_changed)
+ {
+ /*
+ Q: is there somthing subtle with block->hash_link: can it be NULL?
+ does it have to be == hash_link->block... ?
+ */
+ DBUG_ASSERT(block->hash_link != NULL);
+ DBUG_ASSERT(block->status & BLOCK_CHANGED);
+ if (block->type != PAGECACHE_LSN_PAGE)
+ continue; /* no need to store it */
+ /*
+ In the current pagecache, rec_lsn is not set correctly:
+ 1) it is set on pagecache_unlock(), too late (a page is dirty
+ (BLOCK_CHANGED) since the first pagecache_write()). So in this
+ scenario:
+ thread1: thread2:
+ write_REDO
+ pagecache_write() checkpoint : reclsn not known
+ pagecache_unlock(sets rec_lsn)
+ commit
+ crash,
+ at recovery we will wrongly skip the REDO. It also affects the
+ low-water mark's computation.
+ 2) sometimes the unlocking can be an implicit action of
+ pagecache_write(), without any call to pagecache_unlock(), then
+ rec_lsn is not set.
+ 1) and 2) are critical problems.
+ TODO: fix this when Monty has explained how he writes BLOB pages.
+ */
+ if (block->rec_lsn == 0)
+ {
+ DBUG_ASSERT(0);
+ goto err;
+ }
+ stored_list_size++;
+ }
+ }
+
+ str->length= 8+(4+4+8)*stored_list_size;
+ if (NULL == (str->str= my_malloc(str->length, MYF(MY_WME))))
+ goto err;
+ ptr= str->str;
+ int8store(ptr, stored_list_size);
+ ptr+= 8;
+ if (0 == stored_list_size)
+ goto end;
+ for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
+ {
+ PAGECACHE_BLOCK_LINK *block;
+ for (block= pagecache->changed_blocks[file_hash] ;
+ block;
+ block= block->next_changed)
+ {
+ if (block->type != PAGECACHE_LSN_PAGE)
+ continue; /* no need to store it in the checkpoint record */
+ DBUG_ASSERT((4 == sizeof(block->hash_link->file.file)) &&
+ (4 == sizeof(block->hash_link->pageno)));
+ int4store(ptr, block->hash_link->file.file);
+ ptr+= 4;
+ int4store(ptr, block->hash_link->pageno);
+ ptr+= 4;
+ int8store(ptr, (ulonglong) block->rec_lsn);
+ ptr+= 8;
+ set_if_bigger(*max_lsn, block->rec_lsn);
+ }
+ }
+ error= 0;
+ goto end;
+err:
+ error= 1;
+end:
+ pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_RETURN(error);
+}
+
+
+#ifndef DBUG_OFF
+/*
+ Test if disk-cache is ok
+*/
+static void test_key_cache(PAGECACHE *pagecache __attribute__((unused)),
+ const char *where __attribute__((unused)),
+ my_bool lock __attribute__((unused)))
+{
+ /* TODO */
+}
+#endif
+
+#if defined(PAGECACHE_TIMEOUT)
+
+#define KEYCACHE_DUMP_FILE "pagecache_dump.txt"
+#define MAX_QUEUE_LEN 100
+
+
+static void pagecache_dump(PAGECACHE *pagecache)
+{
+ FILE *pagecache_dump_file=fopen(KEYCACHE_DUMP_FILE, "w");
+ struct st_my_thread_var *last;
+ struct st_my_thread_var *thread;
+ PAGECACHE_BLOCK_LINK *block;
+ PAGECACHE_HASH_LINK *hash_link;
+ PAGECACHE_PAGE *page;
+ uint i;
+
+ fprintf(pagecache_dump_file, "thread:%u\n", thread->id);
+
+ i=0;
+ thread=last=waiting_for_hash_link.last_thread;
+ fprintf(pagecache_dump_file, "queue of threads waiting for hash link\n");
+ if (thread)
+ do
+ {
+ thread= thread->next;
+ page= (PAGECACHE_PAGE *) thread->opt_info;
+ fprintf(pagecache_dump_file,
+ "thread:%u, (file,pageno)=(%u,%lu)\n",
+ thread->id,(uint) page->file.file,(ulong) page->pageno);
+ if (++i == MAX_QUEUE_LEN)
+ break;
+ }
+ while (thread != last);
+
+ i=0;
+ thread=last=waiting_for_block.last_thread;
+ fprintf(pagecache_dump_file, "queue of threads waiting for block\n");
+ if (thread)
+ do
+ {
+ thread=thread->next;
+ hash_link= (PAGECACHE_HASH_LINK *) thread->opt_info;
+ fprintf(pagecache_dump_file,
+ "thread:%u hash_link:%u (file,pageno)=(%u,%lu)\n",
+ thread->id, (uint) PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link),
+ (uint) hash_link->file.file,(ulong) hash_link->pageno);
+ if (++i == MAX_QUEUE_LEN)
+ break;
+ }
+ while (thread != last);
+
+ for (i=0 ; i < pagecache->blocks_used ; i++)
+ {
+ int j;
+ block= &pagecache->block_root[i];
+ hash_link= block->hash_link;
+ fprintf(pagecache_dump_file,
+ "block:%u hash_link:%d status:%x #requests=%u waiting_for_readers:%d\n",
+ i, (int) (hash_link ?
+ PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link) :
+ -1),
+ block->status, block->requests, block->condvar ? 1 : 0);
+ for (j=0 ; j < COND_SIZE; j++)
+ {
+ PAGECACHE_WQUEUE *wqueue=&block->wqueue[j];
+ thread= last= wqueue->last_thread;
+ fprintf(pagecache_dump_file, "queue #%d\n", j);
+ if (thread)
+ {
+ do
+ {
+ thread=thread->next;
+ fprintf(pagecache_dump_file,
+ "thread:%u\n", thread->id);
+ if (++i == MAX_QUEUE_LEN)
+ break;
+ }
+ while (thread != last);
+ }
+ }
+ }
+ fprintf(pagecache_dump_file, "LRU chain:");
+ block= pagecache= used_last;
+ if (block)
+ {
+ do
+ {
+ block= block->next_used;
+ fprintf(pagecache_dump_file,
+ "block:%u, ", BLOCK_NUMBER(pagecache, block));
+ }
+ while (block != pagecache->used_last);
+ }
+ fprintf(pagecache_dump_file, "\n");
+
+ fclose(pagecache_dump_file);
+}
+
+#endif /* defined(PAGECACHE_TIMEOUT) */
+
+#if defined(PAGECACHE_TIMEOUT) && !defined(__WIN__)
+
+
+static int pagecache_pthread_cond_wait(pthread_cond_t *cond,
+ pthread_mutex_t *mutex)
+{
+ int rc;
+ struct timeval now; /* time when we started waiting */
+ struct timespec timeout; /* timeout value for the wait function */
+ struct timezone tz;
+#if defined(PAGECACHE_DEBUG)
+ int cnt=0;
+#endif
+
+ /* Get current time */
+ gettimeofday(&now, &tz);
+ /* Prepare timeout value */
+ timeout.tv_sec= now.tv_sec + PAGECACHE_TIMEOUT;
+ /*
+ timeval uses microseconds.
+ timespec uses nanoseconds.
+ 1 nanosecond = 1000 micro seconds
+ */
+ timeout.tv_nsec= now.tv_usec * 1000;
+ KEYCACHE_THREAD_TRACE_END("started waiting");
+#if defined(PAGECACHE_DEBUG)
+ cnt++;
+ if (cnt % 100 == 0)
+ fprintf(pagecache_debug_log, "waiting...\n");
+ fflush(pagecache_debug_log);
+#endif
+ rc= pthread_cond_timedwait(cond, mutex, &timeout);
+ KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
+ if (rc == ETIMEDOUT || rc == ETIME)
+ {
+#if defined(PAGECACHE_DEBUG)
+ fprintf(pagecache_debug_log,"aborted by pagecache timeout\n");
+ fclose(pagecache_debug_log);
+ abort();
+#endif
+ pagecache_dump();
+ }
+
+#if defined(PAGECACHE_DEBUG)
+ KEYCACHE_DBUG_ASSERT(rc != ETIMEDOUT);
+#else
+ assert(rc != ETIMEDOUT);
+#endif
+ return rc;
+}
+#else
+#if defined(PAGECACHE_DEBUG)
+static int pagecache_pthread_cond_wait(pthread_cond_t *cond,
+ pthread_mutex_t *mutex)
+{
+ int rc;
+ KEYCACHE_THREAD_TRACE_END("started waiting");
+ rc= pthread_cond_wait(cond, mutex);
+ KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
+ return rc;
+}
+#endif
+#endif /* defined(PAGECACHE_TIMEOUT) && !defined(__WIN__) */
+
+#if defined(PAGECACHE_DEBUG)
+static int ___pagecache_pthread_mutex_lock(pthread_mutex_t *mutex)
+{
+ int rc;
+ rc= pthread_mutex_lock(mutex);
+ KEYCACHE_THREAD_TRACE_BEGIN("");
+ return rc;
+}
+
+
+static void ___pagecache_pthread_mutex_unlock(pthread_mutex_t *mutex)
+{
+ KEYCACHE_THREAD_TRACE_END("");
+ pthread_mutex_unlock(mutex);
+}
+
+
+static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond)
+{
+ int rc;
+ KEYCACHE_THREAD_TRACE("signal");
+ rc= pthread_cond_signal(cond);
+ return rc;
+}
+
+
+#if defined(PAGECACHE_DEBUG_LOG)
+
+
+static void pagecache_debug_print(const char * fmt, ...)
+{
+ va_list args;
+ va_start(args,fmt);
+ if (pagecache_debug_log)
+ {
+ VOID(vfprintf(pagecache_debug_log, fmt, args));
+ VOID(fputc('\n',pagecache_debug_log));
+ }
+ va_end(args);
+}
+#endif /* defined(PAGECACHE_DEBUG_LOG) */
+
+#if defined(PAGECACHE_DEBUG_LOG)
+
+
+void pagecache_debug_log_close(void)
+{
+ if (pagecache_debug_log)
+ fclose(pagecache_debug_log);
+}
+#endif /* defined(PAGECACHE_DEBUG_LOG) */
+
+#endif /* defined(PAGECACHE_DEBUG) */
diff --git a/mysys/my_atomic.c b/mysys/my_atomic.c
index 6a30267eb80..aa04d55f624 100644
--- a/mysys/my_atomic.c
+++ b/mysys/my_atomic.c
@@ -17,11 +17,10 @@
#include <my_pthread.h>
#ifndef HAVE_INLINE
-/*
- the following will cause all inline functions to be instantiated
-*/
+/* the following will cause all inline functions to be instantiated */
#define HAVE_INLINE
-#define static extern
+#undef STATIC_INLINE
+#define STATIC_INLINE extern
#endif
#include <my_atomic.h>
@@ -35,7 +34,7 @@
*/
int my_atomic_initialize()
{
- DBUG_ASSERT(sizeof(intptr) == sizeof(void *));
+ compile_time_assert(sizeof(intptr) == sizeof(void *));
/* currently the only thing worth checking is SMP/UP issue */
#ifdef MY_ATOMIC_MODE_DUMMY
return my_getncpus() == 1 ? MY_ATOMIC_OK : MY_ATOMIC_NOT_1CPU;
diff --git a/mysys/my_bit.c b/mysys/my_bit.c
index 5a9b1187c83..2881eb1ebd2 100644
--- a/mysys/my_bit.c
+++ b/mysys/my_bit.c
@@ -13,23 +13,18 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-/* Some useful bit functions */
+#include <my_global.h>
-#include "mysys_priv.h"
-
-/*
- Find smallest X in 2^X >= value
- This can be used to divide a number with value by doing a shift instead
-*/
+#ifndef HAVE_INLINE
+/* the following will cause all inline functions to be instantiated */
+#define HAVE_INLINE
+#undef STATIC_INLINE
+#define STATIC_INLINE extern
+#endif
-uint my_bit_log2(ulong value)
-{
- uint bit;
- for (bit=0 ; value > 1 ; value>>=1, bit++) ;
- return bit;
-}
+#include <my_bit.h>
-static char nbits[256] = {
+const char _my_bits_nbits[256] = {
0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
@@ -48,60 +43,29 @@ static char nbits[256] = {
4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8,
};
-uint my_count_bits(ulonglong v)
-{
-#if SIZEOF_LONG_LONG > 4
- /* The following code is a bit faster on 16 bit machines than if we would
- only shift v */
- ulong v2=(ulong) (v >> 32);
- return (uint) (uchar) (nbits[(uchar) v] +
- nbits[(uchar) (v >> 8)] +
- nbits[(uchar) (v >> 16)] +
- nbits[(uchar) (v >> 24)] +
- nbits[(uchar) (v2)] +
- nbits[(uchar) (v2 >> 8)] +
- nbits[(uchar) (v2 >> 16)] +
- nbits[(uchar) (v2 >> 24)]);
-#else
- return (uint) (uchar) (nbits[(uchar) v] +
- nbits[(uchar) (v >> 8)] +
- nbits[(uchar) (v >> 16)] +
- nbits[(uchar) (v >> 24)]);
-#endif
-}
-
-uint my_count_bits_ushort(ushort v)
-{
- return nbits[v];
-}
-
-
/*
- Next highest power of two
-
- SYNOPSIS
- my_round_up_to_next_power()
- v Value to check
-
- RETURN
- Next or equal power of 2
- Note: 0 will return 0
-
- NOTES
- Algorithm by Sean Anderson, according to:
- http://graphics.stanford.edu/~seander/bithacks.html
- (Orignal code public domain)
-
- Comments shows how this works with 01100000000000000000000000001011
+ perl -e 'print map{", 0x".unpack H2,pack B8,unpack b8,chr$_}(0..255)'
*/
+const uchar _my_bits_reverse_table[256]={
+0x00, 0x80, 0x40, 0xC0, 0x20, 0xA0, 0x60, 0xE0, 0x10, 0x90, 0x50, 0xD0, 0x30,
+0xB0, 0x70, 0xF0, 0x08, 0x88, 0x48, 0xC8, 0x28, 0xA8, 0x68, 0xE8, 0x18, 0x98,
+0x58, 0xD8, 0x38, 0xB8, 0x78, 0xF8, 0x04, 0x84, 0x44, 0xC4, 0x24, 0xA4, 0x64,
+0xE4, 0x14, 0x94, 0x54, 0xD4, 0x34, 0xB4, 0x74, 0xF4, 0x0C, 0x8C, 0x4C, 0xCC,
+0x2C, 0xAC, 0x6C, 0xEC, 0x1C, 0x9C, 0x5C, 0xDC, 0x3C, 0xBC, 0x7C, 0xFC, 0x02,
+0x82, 0x42, 0xC2, 0x22, 0xA2, 0x62, 0xE2, 0x12, 0x92, 0x52, 0xD2, 0x32, 0xB2,
+0x72, 0xF2, 0x0A, 0x8A, 0x4A, 0xCA, 0x2A, 0xAA, 0x6A, 0xEA, 0x1A, 0x9A, 0x5A,
+0xDA, 0x3A, 0xBA, 0x7A, 0xFA, 0x06, 0x86, 0x46, 0xC6, 0x26, 0xA6, 0x66, 0xE6,
+0x16, 0x96, 0x56, 0xD6, 0x36, 0xB6, 0x76, 0xF6, 0x0E, 0x8E, 0x4E, 0xCE, 0x2E,
+0xAE, 0x6E, 0xEE, 0x1E, 0x9E, 0x5E, 0xDE, 0x3E, 0xBE, 0x7E, 0xFE, 0x01, 0x81,
+0x41, 0xC1, 0x21, 0xA1, 0x61, 0xE1, 0x11, 0x91, 0x51, 0xD1, 0x31, 0xB1, 0x71,
+0xF1, 0x09, 0x89, 0x49, 0xC9, 0x29, 0xA9, 0x69, 0xE9, 0x19, 0x99, 0x59, 0xD9,
+0x39, 0xB9, 0x79, 0xF9, 0x05, 0x85, 0x45, 0xC5, 0x25, 0xA5, 0x65, 0xE5, 0x15,
+0x95, 0x55, 0xD5, 0x35, 0xB5, 0x75, 0xF5, 0x0D, 0x8D, 0x4D, 0xCD, 0x2D, 0xAD,
+0x6D, 0xED, 0x1D, 0x9D, 0x5D, 0xDD, 0x3D, 0xBD, 0x7D, 0xFD, 0x03, 0x83, 0x43,
+0xC3, 0x23, 0xA3, 0x63, 0xE3, 0x13, 0x93, 0x53, 0xD3, 0x33, 0xB3, 0x73, 0xF3,
+0x0B, 0x8B, 0x4B, 0xCB, 0x2B, 0xAB, 0x6B, 0xEB, 0x1B, 0x9B, 0x5B, 0xDB, 0x3B,
+0xBB, 0x7B, 0xFB, 0x07, 0x87, 0x47, 0xC7, 0x27, 0xA7, 0x67, 0xE7, 0x17, 0x97,
+0x57, 0xD7, 0x37, 0xB7, 0x77, 0xF7, 0x0F, 0x8F, 0x4F, 0xCF, 0x2F, 0xAF, 0x6F,
+0xEF, 0x1F, 0x9F, 0x5F, 0xDF, 0x3F, 0xBF, 0x7F, 0xFF
+};
-uint32 my_round_up_to_next_power(uint32 v)
-{
- v--; /* 01100000000000000000000000001010 */
- v|= v >> 1; /* 01110000000000000000000000001111 */
- v|= v >> 2; /* 01111100000000000000000000001111 */
- v|= v >> 4; /* 01111111110000000000000000001111 */
- v|= v >> 8; /* 01111111111111111100000000001111 */
- v|= v >> 16; /* 01111111111111111111111111111111 */
- return v+1; /* 10000000000000000000000000000000 */
-}
diff --git a/mysys/my_bitmap.c b/mysys/my_bitmap.c
index 10eff40b9ed..e127b2584ae 100644
--- a/mysys/my_bitmap.c
+++ b/mysys/my_bitmap.c
@@ -38,6 +38,7 @@
#include "mysys_priv.h"
#include <my_bitmap.h>
#include <m_string.h>
+#include <my_bit.h>
void create_last_word_mask(MY_BITMAP *map)
{
diff --git a/mysys/my_create.c b/mysys/my_create.c
index 5639459f5a9..13e4675ee66 100644
--- a/mysys/my_create.c
+++ b/mysys/my_create.c
@@ -52,6 +52,13 @@ File my_create(const char *FileName, int CreateFlags, int access_flags,
fd = open(FileName, access_flags);
#endif
+ if ((MyFlags & MY_SYNC_DIR) && (fd >=0) &&
+ my_sync_dir_by_file(FileName, MyFlags))
+ {
+ my_close(fd, MyFlags);
+ fd= -1;
+ }
+
DBUG_RETURN(my_register_filename(fd, FileName, FILE_BY_CREATE,
EE_CANTCREATEFILE, MyFlags));
} /* my_create */
diff --git a/mysys/my_delete.c b/mysys/my_delete.c
index bac3e2513e1..14374fd3fa8 100644
--- a/mysys/my_delete.c
+++ b/mysys/my_delete.c
@@ -29,6 +29,9 @@ int my_delete(const char *name, myf MyFlags)
my_error(EE_DELETE,MYF(ME_BELL+ME_WAITTANG+(MyFlags & ME_NOINPUT)),
name,errno);
}
+ else if ((MyFlags & MY_SYNC_DIR) &&
+ my_sync_dir_by_file(name, MyFlags))
+ err= -1;
DBUG_RETURN(err);
} /* my_delete */
diff --git a/mysys/my_getsystime.c b/mysys/my_getsystime.c
index 2fd7eed7778..8f7cc5b7029 100644
--- a/mysys/my_getsystime.c
+++ b/mysys/my_getsystime.c
@@ -34,10 +34,6 @@ ulonglong my_getsystime()
LARGE_INTEGER t_cnt;
if (!offset)
{
- /* strictly speaking there should be a mutex to protect
- initialization section. But my_getsystime() is called from
- UUID() code, and UUID() calls are serialized with a mutex anyway
- */
LARGE_INTEGER li;
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
diff --git a/mysys/my_handler.c b/mysys/my_handler.c
index afc44cc2838..757cbe490f8 100644
--- a/mysys/my_handler.c
+++ b/mysys/my_handler.c
@@ -16,9 +16,11 @@
MA 02111-1307, USA */
#include <my_global.h>
-#include "my_handler.h"
+#include <m_ctype.h>
+#include <my_base.h>
+#include <my_handler.h>
-int mi_compare_text(CHARSET_INFO *charset_info, uchar *a, uint a_length,
+int ha_compare_text(CHARSET_INFO *charset_info, uchar *a, uint a_length,
uchar *b, uint b_length, my_bool part_key,
my_bool skip_end_space)
{
@@ -174,7 +176,7 @@ int ha_key_cmp(register HA_KEYSEG *keyseg, register uchar *a,
next_key_length=key_length-b_length-pack_length;
if (piks &&
- (flag=mi_compare_text(keyseg->charset,a,a_length,b,b_length,
+ (flag=ha_compare_text(keyseg->charset,a,a_length,b,b_length,
(my_bool) ((nextflag & SEARCH_PREFIX) &&
next_key_length <= 0),
(my_bool)!(nextflag & SEARCH_PREFIX))))
@@ -187,7 +189,7 @@ int ha_key_cmp(register HA_KEYSEG *keyseg, register uchar *a,
{
uint length=(uint) (end-a), a_length=length, b_length=length;
if (piks &&
- (flag= mi_compare_text(keyseg->charset, a, a_length, b, b_length,
+ (flag= ha_compare_text(keyseg->charset, a, a_length, b, b_length,
(my_bool) ((nextflag & SEARCH_PREFIX) &&
next_key_length <= 0),
(my_bool)!(nextflag & SEARCH_PREFIX))))
@@ -235,7 +237,7 @@ int ha_key_cmp(register HA_KEYSEG *keyseg, register uchar *a,
next_key_length=key_length-b_length-pack_length;
if (piks &&
- (flag= mi_compare_text(keyseg->charset,a,a_length,b,b_length,
+ (flag= ha_compare_text(keyseg->charset,a,a_length,b,b_length,
(my_bool) ((nextflag & SEARCH_PREFIX) &&
next_key_length <= 0),
(my_bool) ((nextflag & (SEARCH_FIND |
@@ -482,12 +484,15 @@ end:
DESCRIPTION
Find the first NULL value in index-suffix values tuple.
- TODO Consider optimizing this fuction or its use so we don't search for
- NULL values in completely NOT NULL index suffixes.
+
+ TODO
+ Consider optimizing this function or its use so we don't search for
+ NULL values in completely NOT NULL index suffixes.
RETURN
- First key part that has NULL as value in values tuple, or the last key part
- (with keyseg->type==HA_TYPE_END) if values tuple doesn't contain NULLs.
+ First key part that has NULL as value in values tuple, or the last key
+ part (with keyseg->type==HA_TYPE_END) if values tuple doesn't contain
+ NULLs.
*/
HA_KEYSEG *ha_find_null(HA_KEYSEG *keyseg, uchar *a)
diff --git a/mysys/my_init.c b/mysys/my_init.c
index 7784c09d9d6..e8a55fdc1e6 100644
--- a/mysys/my_init.c
+++ b/mysys/my_init.c
@@ -43,6 +43,7 @@ static void netware_init();
my_bool my_init_done= 0;
uint mysys_usage_id= 0; /* Incremented for each my_init() */
+ulong my_thread_stack_size= 65536;
static ulong atoi_octal(const char *str)
{
diff --git a/mysys/my_open.c b/mysys/my_open.c
index 21bdedddc48..6fe7883b99b 100644
--- a/mysys/my_open.c
+++ b/mysys/my_open.c
@@ -161,6 +161,7 @@ File my_register_filename(File fd, const char *FileName, enum file_type
}
pthread_mutex_unlock(&THR_LOCK_open);
(void) my_close(fd, MyFlags);
+ fd= -1;
my_errno=ENOMEM;
}
else
diff --git a/mysys/my_pread.c b/mysys/my_pread.c
index 7a09c21e039..2b9a994299f 100644
--- a/mysys/my_pread.c
+++ b/mysys/my_pread.c
@@ -51,7 +51,7 @@ uint my_pread(File Filedes, byte *Buffer, uint Count, my_off_t offset,
if (!error) /* Seek was successful */
{
if ((readbytes = (uint) read(Filedes, Buffer, Count)) == -1L)
- my_errno= errno;
+ my_errno= errno ? errno : -1;
/*
We should seek back, even if read failed. If this fails,
@@ -67,7 +67,7 @@ uint my_pread(File Filedes, byte *Buffer, uint Count, my_off_t offset,
#else
if ((error= ((readbytes =
(uint) pread(Filedes, Buffer, Count, offset)) != Count)))
- my_errno= errno;
+ my_errno= errno ? errno : -1;
#endif
if (error || readbytes != Count)
{
@@ -87,8 +87,10 @@ uint my_pread(File Filedes, byte *Buffer, uint Count, my_off_t offset,
my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
my_filename(Filedes),my_errno);
else if (MyFlags & (MY_NABP | MY_FNABP))
+ {
my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
my_filename(Filedes),my_errno);
+ }
}
if ((int) readbytes == -1 || (MyFlags & (MY_FNABP | MY_NABP)))
DBUG_RETURN(MY_FILE_ERROR); /* Return with error */
@@ -158,7 +160,8 @@ uint my_pwrite(int Filedes, const byte *Buffer, uint Count, my_off_t offset,
Count-=writenbytes;
offset+=writenbytes;
}
- DBUG_PRINT("error",("Write only %d bytes",writenbytes));
+ DBUG_PRINT("error",("Write only %d bytes, error: %d",
+ writenbytes, my_errno));
#ifndef NO_BACKGROUND
#ifdef THREAD
if (my_thread_var->abort)
diff --git a/mysys/my_rename.c b/mysys/my_rename.c
index 6a6aa6a5796..64dbac955ea 100644
--- a/mysys/my_rename.c
+++ b/mysys/my_rename.c
@@ -16,8 +16,9 @@
#include "mysys_priv.h"
#include <my_dir.h>
#include "mysys_err.h"
-
+#include "m_string.h"
#undef my_rename
+
/* On unix rename deletes to file if it exists */
int my_rename(const char *from, const char *to, myf MyFlags)
@@ -60,5 +61,18 @@ int my_rename(const char *from, const char *to, myf MyFlags)
if (MyFlags & (MY_FAE+MY_WME))
my_error(EE_LINK, MYF(ME_BELL+ME_WAITTANG),from,to,my_errno);
}
+ else if (MyFlags & MY_SYNC_DIR)
+ {
+#ifdef NEED_EXPLICIT_SYNC_DIR
+ /* do only the needed amount of syncs: */
+ char dir_from[FN_REFLEN], dir_to[FN_REFLEN];
+ dirname_part(dir_from, from);
+ dirname_part(dir_to, to);
+ if (my_sync_dir(dir_from, MyFlags) ||
+ (strcmp(dir_from, dir_to) &&
+ my_sync_dir(dir_to, MyFlags)))
+ error= -1;
+#endif
+ }
DBUG_RETURN(error);
} /* my_rename */
diff --git a/mysys/my_symlink.c b/mysys/my_symlink.c
index 810c0c72632..98059ccd508 100644
--- a/mysys/my_symlink.c
+++ b/mysys/my_symlink.c
@@ -84,6 +84,8 @@ int my_symlink(const char *content, const char *linkname, myf MyFlags)
if (MyFlags & MY_WME)
my_error(EE_CANT_SYMLINK, MYF(0), linkname, content, errno);
}
+ else if ((MyFlags & MY_SYNC_DIR) && my_sync_dir_by_file(linkname, MyFlags))
+ result= -1;
DBUG_RETURN(result);
#endif /* HAVE_READLINK */
}
diff --git a/mysys/my_sync.c b/mysys/my_sync.c
index 64fce3aac21..ab3fc89e0d3 100644
--- a/mysys/my_sync.c
+++ b/mysys/my_sync.c
@@ -48,6 +48,16 @@ int my_sync(File fd, myf my_flags)
do
{
+#if defined(F_FULLFSYNC)
+ /*
+ In Mac OS X >= 10.3 this call is safer than fsync() (it forces the
+ disk's cache and guarantees ordered writes).
+ */
+ if (!(res= fcntl(fd, F_FULLFSYNC, 0)))
+ break; /* ok */
+ /* Some file systems don't support F_FULLFSYNC and fail above: */
+ DBUG_PRINT("info",("fcntl(F_FULLFSYNC) failed, falling back"));
+#endif
#if defined(HAVE_FDATASYNC)
res= fdatasync(fd);
#elif defined(HAVE_FSYNC)
@@ -55,6 +65,7 @@ int my_sync(File fd, myf my_flags)
#elif defined(__WIN__)
res= _commit(fd);
#else
+#error Cannot find a way to sync a file, durability in danger
res= 0; /* No sync (strange OS) */
#endif
} while (res == -1 && errno == EINTR);
@@ -66,10 +77,78 @@ int my_sync(File fd, myf my_flags)
my_errno= -1; /* Unknown error */
if ((my_flags & MY_IGNORE_BADFD) &&
(er == EBADF || er == EINVAL || er == EROFS))
+ {
+ DBUG_PRINT("info", ("ignoring errno %d", er));
res= 0;
+ }
else if (my_flags & MY_WME)
my_error(EE_SYNC, MYF(ME_BELL+ME_WAITTANG), my_filename(fd), my_errno);
}
DBUG_RETURN(res);
} /* my_sync */
+
+static const char cur_dir_name[]= {FN_CURLIB, 0};
+/*
+ Force directory information to disk.
+
+ SYNOPSIS
+ my_sync_dir()
+ dir_name the name of the directory
+ my_flags flags (MY_WME etc)
+
+ RETURN
+ 0 if ok, !=0 if error
+*/
+int my_sync_dir(const char *dir_name, myf my_flags)
+{
+#ifdef NEED_EXPLICIT_SYNC_DIR
+ DBUG_ENTER("my_sync_dir");
+ DBUG_PRINT("my",("Dir: '%s' my_flags: %d", dir_name, my_flags));
+ File dir_fd;
+ int res= 0;
+ const char *correct_dir_name;
+ /* Sometimes the path does not contain an explicit directory */
+ correct_dir_name= (dir_name[0] == 0) ? cur_dir_name : dir_name;
+ /*
+ Syncing a dir may give EINVAL on tmpfs on Linux, which is ok.
+ EIO on the other hand is very important. Hence MY_IGNORE_BADFD.
+ */
+ if ((dir_fd= my_open(correct_dir_name, O_RDONLY, MYF(my_flags))) >= 0)
+ {
+ if (my_sync(dir_fd, MYF(my_flags | MY_IGNORE_BADFD)))
+ res= 2;
+ if (my_close(dir_fd, MYF(my_flags)))
+ res= 3;
+ }
+ else
+ res= 1;
+ DBUG_RETURN(res);
+#else
+ return 0;
+#endif
+}
+
+
+/*
+ Force directory information to disk.
+
+ SYNOPSIS
+ my_sync_dir_by_file()
+ file_name the name of a file in the directory
+ my_flags flags (MY_WME etc)
+
+ RETURN
+ 0 if ok, !=0 if error
+*/
+int my_sync_dir_by_file(const char *file_name, myf my_flags)
+{
+#ifdef NEED_EXPLICIT_SYNC_DIR
+ char dir_name[FN_REFLEN];
+ dirname_part(dir_name, file_name);
+ return my_sync_dir(dir_name, my_flags);
+#else
+ return 0;
+#endif
+}
+
diff --git a/mysys/wqueue.c b/mysys/wqueue.c
new file mode 100644
index 00000000000..28e044ff606
--- /dev/null
+++ b/mysys/wqueue.c
@@ -0,0 +1,167 @@
+
+#include <wqueue.h>
+
+#define STRUCT_PTR(TYPE, MEMBER, a) \
+ (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))
+/*
+ Link a thread into double-linked queue of waiting threads.
+
+ SYNOPSIS
+ wqueue_link_into_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be added to the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ Queue is represented by a circular list of the thread structures
+ The list is double-linked of the type (**prev,*next), accessed by
+ a pointer to the last element.
+*/
+
+void wqueue_link_into_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
+{
+ struct st_my_thread_var *last;
+ if (!(last= wqueue->last_thread))
+ {
+ /* Queue is empty */
+ thread->next= thread;
+ thread->prev= &thread->next;
+ }
+ else
+ {
+ thread->prev= last->next->prev;
+ last->next->prev= &thread->next;
+ thread->next= last->next;
+ last->next= thread;
+ }
+ wqueue->last_thread= thread;
+}
+
+
+/*
+ Add a thread to single-linked queue of waiting threads
+
+ SYNOPSIS
+ wqueue_add_to_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be added to the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ Queue is represented by a circular list of the thread structures
+ The list is single-linked of the type (*next), accessed by a pointer
+ to the last element.
+*/
+
+void wqueue_add_to_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
+{
+ struct st_my_thread_var *last;
+ if (!(last= wqueue->last_thread))
+ thread->next= thread;
+ else
+ {
+ thread->next= last->next;
+ last->next= thread;
+ }
+ wqueue->last_thread= thread;
+}
+
+/*
+ Unlink a thread from double-linked queue of waiting threads
+
+ SYNOPSIS
+ wqueue_unlink_from_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be removed from the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ See NOTES for link_into_queue
+*/
+
+void wqueue_unlink_from_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
+{
+ if (thread->next == thread)
+ /* The queue contains only one member */
+ wqueue->last_thread= NULL;
+ else
+ {
+ thread->next->prev= thread->prev;
+ *thread->prev= thread->next;
+ if (wqueue->last_thread == thread)
+ wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next,
+ thread->prev);
+ }
+ thread->next= NULL;
+}
+
+
+/*
+ Remove all threads from queue signaling them to proceed
+
+ SYNOPSIS
+ wqueue_realease_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be added to the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ See notes for add_to_queue
+ When removed from the queue each thread is signaled via condition
+ variable thread->suspend.
+*/
+
+void wqueue_release_queue(WQUEUE *wqueue)
+{
+ struct st_my_thread_var *last= wqueue->last_thread;
+ struct st_my_thread_var *next= last->next;
+ struct st_my_thread_var *thread;
+ do
+ {
+ thread= next;
+ pthread_cond_signal(&thread->suspend);
+ next= thread->next;
+ thread->next= NULL;
+ }
+ while (thread != last);
+ wqueue->last_thread= NULL;
+}
+
+
+/*
+ Add thread and wait
+
+ SYNOPSYS
+ wqueue_add_and_wait()
+ wqueue queue to add to
+ thread thread which is waiting
+ lock mutex need for the operation
+*/
+
+void wqueue_add_and_wait(WQUEUE *wqueue,
+ struct st_my_thread_var *thread, pthread_mutex_t *lock)
+{
+ DBUG_ENTER("wqueue_add_and_wait");
+ DBUG_PRINT("enter", ("thread ox%lxcond 0x%lx, mutex 0x%lx",
+ (ulong) thread, (ulong) &thread->suspend, (ulong) lock));
+ wqueue_add_to_queue(wqueue, thread);
+ do
+ {
+ DBUG_PRINT("info", ("wait... cond 0x%lx, mutex 0x%lx",
+ (ulong) &thread->suspend, (ulong) lock));
+ pthread_cond_wait(&thread->suspend, lock);
+ DBUG_PRINT("info", ("wait done cond 0x%lx, mutex 0x%lx, next 0x%lx",
+ (ulong) &thread->suspend, (ulong) lock,
+ (ulong) thread->next));
+ }
+ while (thread->next);
+ DBUG_VOID_RETURN;
+}