diff options
author | unknown <monty@mysql.com/narttu.mysql.fi> | 2007-05-29 20:13:56 +0300 |
---|---|---|
committer | unknown <monty@mysql.com/narttu.mysql.fi> | 2007-05-29 20:13:56 +0300 |
commit | 8f39541e7d8ba812d1198af5d4179ba44d6693fa (patch) | |
tree | fb321fb70d8c6fa75698d9bedf4218927421686e | |
parent | 486fd0a22ae11ed6bc1f425e7ab7fcdb03679b20 (diff) | |
download | mariadb-git-8f39541e7d8ba812d1198af5d4179ba44d6693fa.tar.gz |
This patch is a collection of patches from from Sanja, Sergei and Monty.
Added logging and pinning of pages to block format.
Integration of transaction manager, log handler.
Better page cache intergration
Split trnman.h into two files, so that we don't have to include my_atomic.h into C++ programs.
Renaming of structures, more comments, more debugging etc.
Fixed problem with small head block + long varchar.
Added extra argument to delete_record() and update_record() (needed for UNDO logging)
Small changes to interface of pagecache and log handler.
Change initialization of log_record_type_descriptors to not be depending on enum order.
Use array of LEX_STRING's to send data to log handler
Added 'dummy' transaction option to MARIA_INFO so that we can always assume 'trn' exists.
include/lf.h:
Interface fixes
Rename of structures
(Patch from Sergei via Sanja)
include/my_atomic.h:
More comments
include/my_global.h:
Added MY_ERRPTR
include/pagecache.h:
Added undo LSN when unlocking pages
mysql-test/r/maria.result:
Updated results
mysql-test/t/maria.test:
Added autocommit around lock tables
(Patch from Sanja)
mysys/lf_alloc-pin.c:
Post-review fixes, simple optimizations
More comments
Struct slot renames
Check amount of memory on stack
(Patch from Sergei)
mysys/lf_dynarray.c:
More comments
mysys/lf_hash.c:
More comments
After review fixes
(Patch from Sergei)
storage/maria/ha_maria.cc:
Split trnman.h into two files, so that we don't have to include my_atomic.h into the .cc program.
(Temporary fix to avoid bug in gcc)
Move out all deferencing of the transaction structure.
Transaction manager integrated (Patch from Sergei)
storage/maria/ha_maria.h:
Added prototype for start_stmt()
storage/maria/lockman.c:
Function call rename
storage/maria/ma_bitmap.c:
Mark deleted pages free from page cache
storage/maria/ma_blockrec.c:
Offset -> rownr
More debugging
Fixed problem with small head block + long varchar
Added logging of changed pages
Added logging of undo (Including only loggging of changed fields in case of update)
Added pinning/unpinning of all changed pages
More comments
Added free_full_pages() as the same code was used in several places.
fill_rows_parts() renamed as fill_insert_undo_parts()
offset -> rownr
Added some optimization of not transactional tables
_ma_update_block_record() has new parameter, as we need original row to do efficent undo for update
storage/maria/ma_blockrec.h:
Added ROW_EXTENTS_ON_STACK
Changed prototype for update and delete of row
storage/maria/ma_check.c:
Added original row to delete_record() call
storage/maria/ma_control_file.h:
Added ifdefs for C++
storage/maria/ma_delete.c:
Added original row to delete_record() call
(Needed for efficent undo logging)
storage/maria/ma_dynrec.c:
Added extra argument to delete_record() and update_record()
Removed not used variable
storage/maria/ma_init.c:
Initialize log handler
storage/maria/ma_loghandler.c:
Removed not used variable
Change initialization of log_record_type_descriptors to not be depending on enum order
Use array of LEX_STRING's to send data to log handler
storage/maria/ma_loghandler.h:
New defines
Use array of LEX_STRING's to send data to log handler
storage/maria/ma_open.c:
Added 'dummy' transaction option to MARIA_INFO so that we can always assume 'trn' exists.
Store in MARIA_SHARE->page_type if pages will have up to date LSN's
storage/maria/ma_pagecache.c:
Don't decrease number of readers when using pagecache_write()/pagecache_read()
In pagecache_write() decrement request count if page was left pinned
Added pagecache_delete_pages()
Removed some casts
Make trace output consistent with rest of code
Simplify calling of DBUG_ASSERT(0)
Only update LSN if the LSN is bigger than what's already on the page
Added LSN parameter pagecache_unpin_page(), pagecache_unpin(), and pagecache_unlock()
(Part of patch from Sanja)
storage/maria/ma_static.c:
Added 'dummy' transaction option to MARIA_INFO so that we can always assume 'trn' exists.
Added default page cache
storage/maria/ma_statrec.c:
Added extra argument to delete_record() and update_record()
storage/maria/ma_test1.c:
Added option -T for transactions
storage/maria/ma_test2.c:
Added option -T for transactions
storage/maria/ma_test_all.sh:
Test with transactions
storage/maria/ma_update.c:
Changed prototype for update of row
storage/maria/maria_def.h:
Changed prototype for update & delete of row as block records need to access the old row
Store in MARIA_SHARE->page_type if pages will have up to date LSN's
Added MARIA_MAX_TREE_LEVELS to allow us to calculate the number of possible pinned pages we may need.
Removed not used 'empty_bits_buffer'
Added pointer to transaction object
Added array for pinned pages
Added log_row_parts array for logging of field data.
Added MARIA_PINNED_PAGE to store pinned pages
storage/maria/trnman.c:
Added accessor functions to transaction object
Added missing DBUG_RETURN()
More debugging
More comments
Changed // comment of code to #ifdef NOT_USED
Transaction manager integrated.
Post review fixes
Part of patch originally from Sergei
storage/maria/trnman.h:
Split trnman.h into two files, so that we don't have to include my_atomic.h into the .cc program.
(Temporary fix to avoid bug in gcc)
storage/maria/unittest/ma_pagecache_single.c:
Added missing argument
Added SKIP_BIG_TESTS
(Patch from Sanja)
storage/maria/unittest/ma_test_loghandler-t.c:
Test logging with new LEX_STRING parameter
(Patch from Sanja)
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
Test logging with new LEX_STRING parameter
(Patch from Sanja)
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
Test logging with new LEX_STRING parameter
(Patch from Sanja)
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
Test logging with new LEX_STRING parameter
(Patch from Sanja)
storage/maria/unittest/trnman-t.c:
Stack overflow detection
(Patch from Sergei)
unittest/unit.pl:
Command-line options --big and --verbose
(Patch from Sergei)
unittest/mytap/tap.c:
Detect --big
(Patch from Sergei)
unittest/mytap/tap.h:
Skip_big_tests and SKIP_BIG_TESTS
(Patch from Sergei)
storage/maria/trnman_public.h:
New BitKeeper file ``storage/maria/trnman_public.h''
43 files changed, 2614 insertions, 812 deletions
diff --git a/include/lf.h b/include/lf.h index 39a47e6568a..4712f9c4862 100644 --- a/include/lf.h +++ b/include/lf.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2006 MySQL AB +/* Copyright (C) 2007 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -96,26 +96,28 @@ nolock_wrap(lf_dynarray_iterate, int, #define LF_PINBOX_PINS 4 #define LF_PURGATORY_SIZE 10 -typedef void lf_pinbox_free_func(void *, void *); +typedef void lf_pinbox_free_func(void *, void *, void*); typedef struct { - LF_DYNARRAY pinstack; + LF_DYNARRAY pinarray; lf_pinbox_free_func *free_func; void *free_func_arg; uint free_ptr_offset; uint32 volatile pinstack_top_ver; /* this is a versioned pointer */ - uint32 volatile pins_in_stack; /* number of elements in array */ + uint32 volatile pins_in_array; /* number of elements in array */ } LF_PINBOX; typedef struct { void * volatile pin[LF_PINBOX_PINS]; LF_PINBOX *pinbox; + void *stack_ends_here; void *purgatory; uint32 purgatory_count; uint32 volatile link; /* we want sizeof(LF_PINS) to be 128 to avoid false sharing */ char pad[128-sizeof(uint32)*2 -sizeof(LF_PINBOX *) + -sizeof(void*) -sizeof(void *)*(LF_PINBOX_PINS+1)]; } LF_PINS; @@ -124,9 +126,9 @@ typedef struct { (e.g. lf_hash). */ #define lf_rwlock_by_pins(PINS) \ - my_atomic_rwlock_wrlock(&(PINS)->pinbox->pinstack.lock) + my_atomic_rwlock_wrlock(&(PINS)->pinbox->pinarray.lock) #define lf_rwunlock_by_pins(PINS) \ - my_atomic_rwlock_wrunlock(&(PINS)->pinbox->pinstack.lock) + my_atomic_rwlock_wrunlock(&(PINS)->pinbox->pinarray.lock) /* compile-time assert, to require "no less than N" pins @@ -135,9 +137,9 @@ typedef struct { */ #if defined(__GNUC__) && defined(MY_LF_EXTRA_DEBUG) #define LF_REQUIRE_PINS(N) \ - static const char require_pins[LF_PINBOX_PINS-N] __attribute__ ((unused)); \ - static const int LF_NUM_PINS_IN_THIS_FILE= N - + static const char require_pins[LF_PINBOX_PINS-N] \ + __attribute__ ((unused)); \ + static const int LF_NUM_PINS_IN_THIS_FILE= N; #define _lf_pin(PINS, PIN, ADDR) \ ( \ assert(PIN < LF_NUM_PINS_IN_THIS_FILE), \ @@ -164,17 +166,17 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, void lf_pinbox_destroy(LF_PINBOX *pinbox); lock_wrap(lf_pinbox_get_pins, LF_PINS *, - (LF_PINBOX *pinbox), - (pinbox), - &pinbox->pinstack.lock) + (LF_PINBOX *pinbox, void *stack_end), + (pinbox, stack_end), + &pinbox->pinarray.lock); lock_wrap_void(lf_pinbox_put_pins, (LF_PINS *pins), (pins), - &pins->pinbox->pinstack.lock) + &pins->pinbox->pinarray.lock); lock_wrap_void(lf_pinbox_free, (LF_PINS *pins, void *addr), (pins, addr), - &pins->pinbox->pinstack.lock) + &pins->pinbox->pinarray.lock); /* memory allocator, lf_alloc-pin.c @@ -193,23 +195,23 @@ typedef struct st_lf_allocator { void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset); void lf_alloc_destroy(LF_ALLOCATOR *allocator); -uint lf_alloc_in_pool(LF_ALLOCATOR *allocator); +uint lf_alloc_pool_count(LF_ALLOCATOR *allocator); /* shortcut macros to access underlying pinbox functions from an LF_ALLOCATOR see _lf_pinbox_get_pins() and _lf_pinbox_put_pins() */ -#define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR)) -#define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR)) -#define _lf_alloc_get_pins(ALLOC) _lf_pinbox_get_pins(&(ALLOC)->pinbox) -#define lf_alloc_get_pins(ALLOC) lf_pinbox_get_pins(&(ALLOC)->pinbox) -#define _lf_alloc_put_pins(PINS) _lf_pinbox_put_pins(PINS) -#define lf_alloc_put_pins(PINS) lf_pinbox_put_pins(PINS) -#define lf_alloc_real_free(ALLOC, ADDR) my_free((gptr)(ADDR), MYF(0)) +#define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR)) +#define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR)) +#define _lf_alloc_get_pins(A, ST) _lf_pinbox_get_pins(&(A)->pinbox, (ST)) +#define lf_alloc_get_pins(A, ST) lf_pinbox_get_pins(&(A)->pinbox, (ST)) +#define _lf_alloc_put_pins(PINS) _lf_pinbox_put_pins(PINS) +#define lf_alloc_put_pins(PINS) lf_pinbox_put_pins(PINS) +#define lf_alloc_direct_free(ALLOC, ADDR) my_free((gptr)(ADDR), MYF(0)) lock_wrap(lf_alloc_new, void *, (LF_PINS *pins), (pins), - &pins->pinbox->pinstack.lock) + &pins->pinbox->pinarray.lock); /* extendible hash, lf_hash.c @@ -240,11 +242,11 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen); shortcut macros to access underlying pinbox functions from an LF_HASH see _lf_pinbox_get_pins() and _lf_pinbox_put_pins() */ -#define _lf_hash_get_pins(HASH) _lf_alloc_get_pins(&(HASH)->alloc) -#define lf_hash_get_pins(HASH) lf_alloc_get_pins(&(HASH)->alloc) -#define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS) -#define lf_hash_put_pins(PINS) lf_pinbox_put_pins(PINS) - +#define _lf_hash_get_pins(HASH, ST) _lf_alloc_get_pins(&(HASH)->alloc, (ST)) +#define lf_hash_get_pins(HASH, ST) lf_alloc_get_pins(&(HASH)->alloc, (ST)) +#define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS) +#define lf_hash_put_pins(PINS) lf_pinbox_put_pins(PINS) +#define lf_hash_search_unpin(PINS) lf_unpin((PINS), 2) /* cleanup */ diff --git a/include/my_atomic.h b/include/my_atomic.h index 5f328f32f3f..5fe7e521c19 100644 --- a/include/my_atomic.h +++ b/include/my_atomic.h @@ -225,6 +225,11 @@ make_atomic_fas(ptr) #undef make_atomic_fas_body #undef intptr +/* + the macro below defines (as an expression) the code that + will be run in spin-loops. Intel manuals recummend to have PAUSE there. + It is expected to be defined in include/atomic/ *.h files +*/ #ifndef LF_BACKOFF #define LF_BACKOFF (1) #endif diff --git a/include/my_global.h b/include/my_global.h index 68f47a75e4b..0b089d41611 100644 --- a/include/my_global.h +++ b/include/my_global.h @@ -1034,6 +1034,8 @@ typedef long intptr; #error #endif +#define MY_ERRPTR ((void*)(intptr)1) + #ifdef USE_RAID /* The following is done with a if to not get problems with pre-processors diff --git a/include/pagecache.h b/include/pagecache.h index b917271f0c9..7951e48edb0 100644 --- a/include/pagecache.h +++ b/include/pagecache.h @@ -206,17 +206,21 @@ extern void pagecache_unlock_page(PAGECACHE *pagecache, pgcache_page_no_t pageno, enum pagecache_page_lock lock, enum pagecache_page_pin pin, - LSN first_REDO_LSN_for_page); + LSN first_REDO_LSN_for_page, + LSN lsn); extern void pagecache_unlock(PAGECACHE *pagecache, PAGECACHE_PAGE_LINK *link, enum pagecache_page_lock lock, enum pagecache_page_pin pin, - LSN first_REDO_LSN_for_page); + LSN first_REDO_LSN_for_page, + LSN lsn); extern void pagecache_unpin_page(PAGECACHE *pagecache, PAGECACHE_FILE *file, - pgcache_page_no_t pageno); + pgcache_page_no_t pageno, + LSN lsn); extern void pagecache_unpin(PAGECACHE *pagecache, - PAGECACHE_PAGE_LINK *link); + PAGECACHE_PAGE_LINK *link, + LSN lsn); extern int flush_pagecache_blocks(PAGECACHE *keycache, PAGECACHE_FILE *file, enum flush_type type); @@ -225,6 +229,12 @@ extern my_bool pagecache_delete_page(PAGECACHE *pagecache, pgcache_page_no_t pageno, enum pagecache_page_lock lock, my_bool flush); +extern my_bool pagecache_delete_pages(PAGECACHE *pagecache, + PAGECACHE_FILE *file, + pgcache_page_no_t pageno, + uint page_count, + enum pagecache_page_lock lock, + my_bool flush); extern void end_pagecache(PAGECACHE *keycache, my_bool cleanup); extern my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, LEX_STRING *str, diff --git a/mysql-test/r/maria.result b/mysql-test/r/maria.result index 515cb73eb2a..93d8863f40b 100644 --- a/mysql-test/r/maria.result +++ b/mysql-test/r/maria.result @@ -78,10 +78,15 @@ explain select a,b,c from t1; id select_type table type possible_keys key key_len ref rows Extra 1 SIMPLE t1 ALL NULL NULL NULL NULL 4 drop table t1; +set autocommit=0; +begin; CREATE TABLE t1 (a INT); INSERT INTO t1 VALUES (1), (2), (3); LOCK TABLES t1 WRITE; INSERT INTO t1 VALUES (1), (2), (3); +commit; +set autocommit=1; +UNLOCK TABLES; OPTIMIZE TABLE t1; Table Op Msg_type Msg_text test.t1 optimize status OK @@ -522,12 +527,15 @@ create table t1 (a int not null, primary key(a)) ROW_FORMAT=FIXED; create table t2 (a int not null, b int not null, primary key(a,b)) ROW_FORMAT=FIXED; insert into t1 values (1),(2),(3),(4),(5),(6); insert into t2 values (1,1),(2,1); +set autocommit=0; +begin; lock tables t1 read local, t2 read local; select straight_join * from t1,t2 force index (primary) where t1.a=t2.a; a a b 1 1 1 2 2 1 insert into t2 values(2,0); +commit; select straight_join * from t1,t2 force index (primary) where t1.a=t2.a; a a b 1 1 1 diff --git a/mysql-test/t/maria.test b/mysql-test/t/maria.test index bbf96d401f0..e9a0e146475 100644 --- a/mysql-test/t/maria.test +++ b/mysql-test/t/maria.test @@ -90,10 +90,15 @@ drop table t1; # # Test of OPTIMIZE of locked and modified tables # +set autocommit=0; +begin; CREATE TABLE t1 (a INT); INSERT INTO t1 VALUES (1), (2), (3); LOCK TABLES t1 WRITE; INSERT INTO t1 VALUES (1), (2), (3); +commit; +set autocommit=1; +UNLOCK TABLES; OPTIMIZE TABLE t1; DROP TABLE t1; @@ -510,10 +515,13 @@ create table t1 (a int not null, primary key(a)) ROW_FORMAT=FIXED; create table t2 (a int not null, b int not null, primary key(a,b)) ROW_FORMAT=FIXED; insert into t1 values (1),(2),(3),(4),(5),(6); insert into t2 values (1,1),(2,1); +set autocommit=0; +begin; lock tables t1 read local, t2 read local; select straight_join * from t1,t2 force index (primary) where t1.a=t2.a; connect (root,localhost,root,,test,$MASTER_MYPORT,$MASTER_MYSOCK); insert into t2 values(2,0); +commit; disconnect root; connection default; select straight_join * from t1,t2 force index (primary) where t1.a=t2.a; diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c index e964553a64c..51c4df7c94a 100644 --- a/mysys/lf_alloc-pin.c +++ b/mysys/lf_alloc-pin.c @@ -1,5 +1,5 @@ /* QQ: TODO multi-pinbox */ -/* Copyright (C) 2000 MySQL AB +/* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,10 +32,11 @@ Pins are used to solve ABA problem. To use pins one must obey a pinning protocol: + 1. Let's assume that PTR is a shared pointer to an object. Shared means that any thread may modify it anytime to point to a different object and free the old object. Later the freed object may be potentially - allocated by another thread. If we're unlucky that another thread may + allocated by another thread. If we're unlucky that other thread may set PTR to point to this object again. This is ABA problem. 2. Create a local pointer LOCAL_PTR. 3. Pin the PTR in a loop: @@ -70,12 +71,34 @@ pins you have is limited (and small), keeping an object pinned prevents its reuse and cause unnecessary mallocs. + Explanations: + + 3. The loop is important. The following can occur: + thread1> LOCAL_PTR= PTR + thread2> free(PTR); PTR=0; + thread1> pin(PTR, PIN_NUMBER); + now thread1 cannot access LOCAL_PTR, even if it's pinned, + because it points to a freed memory. That is, it *must* + verify that it has indeed pinned PTR, the shared pointer. + + 6. When a thread wants to free some LOCAL_PTR, and it scans + all lists of pins to see whether it's pinned, it does it + upwards, from low pin numbers to high. Thus another thread + must copy an address from one pin to another in the same + direction - upwards, otherwise the scanning thread may + miss it. + Implementation details: + Pins are given away from a "pinbox". Pinbox is stack-based allocator. It used dynarray for storing pins, new elements are allocated by dynarray as necessary, old are pushed in the stack for reuse. ABA is solved by - versioning a pointer - because we use an array, a pointer to pins is 32 bit, - upper 32 bits are used for a version. + versioning a pointer - because we use an array, a pointer to pins is 16 bit, + upper 16 bits are used for a version. + + It is assumed that pins belong to a thread and are not transferable + between threads (LF_PINS::stack_ends_here being a primary reason + for this limitation). */ #include <my_global.h> @@ -93,11 +116,11 @@ static void _lf_pinbox_real_free(LF_PINS *pins); void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, lf_pinbox_free_func *free_func, void *free_func_arg) { - DBUG_ASSERT(sizeof(LF_PINS) == 128); DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0); - lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS)); + compile_time_assert(sizeof(LF_PINS) == 128); + lf_dynarray_init(&pinbox->pinarray, sizeof(LF_PINS)); pinbox->pinstack_top_ver= 0; - pinbox->pins_in_stack= 0; + pinbox->pins_in_array= 0; pinbox->free_ptr_offset= free_ptr_offset; pinbox->free_func= free_func; pinbox->free_func_arg= free_func_arg; @@ -105,38 +128,72 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, void lf_pinbox_destroy(LF_PINBOX *pinbox) { - lf_dynarray_destroy(&pinbox->pinstack); + lf_dynarray_destroy(&pinbox->pinarray); } /* Get pins from a pinbox. Usually called via lf_alloc_get_pins() or lf_hash_get_pins(). + SYNOPSYS + pinbox - + stack_end - a pointer to the end (top/bottom, depending on the + STACK_DIRECTION) of stack. Used for safe alloca. There's + no safety margin deducted, a caller should take care of it, + if necessary. + DESCRIPTION get a new LF_PINS structure from a stack of unused pins, or allocate a new one out of dynarray. + + NOTE + It is assumed that pins belong to a thread and are not transferable + between threads. */ -LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox) +LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end) { uint32 pins, next, top_ver; LF_PINS *el; - + /* + We have an array of max. 64k elements. + The highest index currently allocated is pinbox->pins_in_array. + Freed elements are in a lifo stack, pinstack_top_ver. + pinstack_top_ver is 32 bits; 16 low bits are the index in the + array, to the first element of the list. 16 high bits are a version + (every time the 16 low bits are updated, the 16 high bits are + incremented). Versioniong prevents the ABA problem. + */ top_ver= pinbox->pinstack_top_ver; do { if (!(pins= top_ver % LF_PINBOX_MAX_PINS)) { - pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1; - el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins); + /* the stack of free elements is empty */ + pins= my_atomic_add32(&pinbox->pins_in_array, 1)+1; + if (unlikely(pins >= LF_PINBOX_MAX_PINS)) + return 0; + /* + note that the first allocated element has index 1 (pins==1). + index 0 is reserved to mean "NULL pointer" + */ + el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinarray, pins); + if (unlikely(!el)) + return 0; break; } - el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins); + el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinarray, pins); next= el->link; } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver, top_ver-pins+next+LF_PINBOX_MAX_PINS)); + /* + set el->link to the index of el in the dynarray (el->link has two usages: + - if element is allocated, it's its own index + - if element is free, it's its next element in the free stack + */ el->link= pins; el->purgatory_count= 0; el->pinbox= pinbox; + el->stack_ends_here= stack_end; return el; } @@ -171,25 +228,17 @@ void _lf_pinbox_put_pins(LF_PINS *pins) _lf_pinbox_real_free(pins); if (pins->purgatory_count) { - my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock); + my_atomic_rwlock_wrunlock(&pins->pinbox->pinarray.lock); pthread_yield(); - my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock); + my_atomic_rwlock_wrlock(&pins->pinbox->pinarray.lock); } } top_ver= pinbox->pinstack_top_ver; - if (nr == pinbox->pins_in_stack) - { - int32 tmp= nr; - if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1)) - goto ret; - } - do { pins->link= top_ver % LF_PINBOX_MAX_PINS; } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver, top_ver-pins->link+nr+LF_PINBOX_MAX_PINS)); -ret: return; } @@ -228,7 +277,7 @@ struct st_harvester { /* callback for _lf_dynarray_iterate: - scan all pins or all threads and accumulate all pins + scan all pins of all threads and accumulate all pins */ static int harvest_pins(LF_PINS *el, struct st_harvester *hv) { @@ -243,13 +292,19 @@ static int harvest_pins(LF_PINS *el, struct st_harvester *hv) *hv->granary++= p; } } + /* + hv->npins may become negative below, but it means that + we're on the last dynarray page and harvest_pins() won't be + called again. We don't bother to make hv->npins() correct + (that is 0) in this case. + */ hv->npins-= LF_DYNARRAY_LEVEL_LENGTH; return 0; } /* callback for _lf_dynarray_iterate: - scan all pins or all threads and see if addr is present there + scan all pins of all threads and see if addr is present there */ static int match_pins(LF_PINS *el, void *addr) { @@ -262,28 +317,35 @@ static int match_pins(LF_PINS *el, void *addr) return 0; } +#if STACK_DIRECTION < 0 +#define available_stack_size(END,CUR) (long) ((char*)(CUR) - (char*)(END)) +#else +#define available_stack_size(END,CUR) (long) ((char*)(END) - (char*)(CUR)) +#endif + /* - Scan the purgatory as free everything that can be freed + Scan the purgatory and free everything that can be freed */ static void _lf_pinbox_real_free(LF_PINS *pins) { - int npins; - void *list; - void **addr; + int npins, alloca_size; + void *list, **addr; + struct st_lf_alloc_node *first, *last= NULL; LF_PINBOX *pinbox= pins->pinbox; - npins= pinbox->pins_in_stack+1; + npins= pinbox->pins_in_array+1; #ifdef HAVE_ALLOCA + alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins; /* create a sorted list of pinned addresses, to speed up searches */ - if (sizeof(void *)*LF_PINBOX_PINS*npins < my_thread_stack_size) + if (available_stack_size(&pinbox, pins->stack_ends_here) > alloca_size) { struct st_harvester hv; - addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins); + addr= (void **) alloca(alloca_size); hv.granary= addr; hv.npins= npins; /* scan the dynarray and accumulate all pinned addresses */ - _lf_dynarray_iterate(&pinbox->pinstack, + _lf_dynarray_iterate(&pinbox->pinarray, (lf_dynarray_func)harvest_pins, &hv); npins= hv.granary-addr; @@ -307,7 +369,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) if (addr) /* use binary search */ { void **a, **b, **c; - for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2) + for (a= addr, b= addr+npins-1, c= a+(b-a)/2; (b-a) > 1; c= a+(b-a)/2) if (cur == *c) a= b= c; else if (cur > *c) @@ -319,41 +381,52 @@ static void _lf_pinbox_real_free(LF_PINS *pins) } else /* no alloca - no cookie. linear search here */ { - if (_lf_dynarray_iterate(&pinbox->pinstack, + if (_lf_dynarray_iterate(&pinbox->pinarray, (lf_dynarray_func)match_pins, cur)) goto found; } } /* not pinned - freeing */ - pinbox->free_func(cur, pinbox->free_func_arg); + if (last) + last= last->next= (struct st_lf_alloc_node *)cur; + else + first= last= (struct st_lf_alloc_node *)cur; continue; found: /* pinned - keeping */ add_to_purgatory(pins, cur); } + if (last) + pinbox->free_func(first, last, pinbox->free_func_arg); } +/* lock-free memory allocator for fixed-size objects */ + +LF_REQUIRE_PINS(1); + /* - callback for _lf_pinbox_real_free to free an unpinned object - + callback for _lf_pinbox_real_free to free a list of unpinned objects - add it back to the allocator stack + + DESCRIPTION + 'first' and 'last' are the ends of the linked list of st_lf_alloc_node's: + first->el->el->....->el->last. Use first==last to free only one element. */ -static void alloc_free(struct st_lf_alloc_node *node, LF_ALLOCATOR *allocator) +static void alloc_free(struct st_lf_alloc_node *first, + struct st_lf_alloc_node *last, + LF_ALLOCATOR *allocator) { struct st_lf_alloc_node *tmp; tmp= allocator->top; do { - node->next= tmp; - } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) && + last->next= tmp; + } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, first) && LF_BACKOFF); } -/* lock-free memory allocator for fixed-size objects */ - -LF_REQUIRE_PINS(1); - /* - initialize lock-free allocatod. + initialize lock-free allocator SYNOPSYS allocator - @@ -362,6 +435,8 @@ LF_REQUIRE_PINS(1); memory that is guaranteed to be unused after the object is put in the purgatory. Unused by ANY thread, not only the purgatory owner. + This memory will be used to link waiting-to-be-freed + objects in a purgatory list. */ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) { @@ -370,12 +445,19 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) allocator->top= 0; allocator->mallocs= 0; allocator->element_size= size; - DBUG_ASSERT(size >= (int)sizeof(void *)); - DBUG_ASSERT(free_ptr_offset < size); + DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset); } /* destroy the allocator, free everything that's in it + + NOTE + As every other init/destroy function here and elsewhere it + is not thread safe. No, this function is no different, ensure + that no thread needs the allocator before destroying it. + We are not responsible for any damage that may be caused by + accessing the allocator when it is being or has been destroyed. + Oh yes, and don't put your cat in a microwave. */ void lf_alloc_destroy(LF_ALLOCATOR *allocator) { @@ -410,16 +492,14 @@ void *_lf_alloc_new(LF_PINS *pins) } while (node != allocator->top && LF_BACKOFF); if (!node) { - if (!(node= (void *)my_malloc(allocator->element_size, - MYF(MY_WME|MY_ZEROFILL)))) - break; + node= (void *)my_malloc(allocator->element_size, MYF(MY_WME)); #ifdef MY_LF_EXTRA_DEBUG - my_atomic_add32(&allocator->mallocs, 1); + if (likely(node)) + my_atomic_add32(&allocator->mallocs, 1); #endif break; } - if (my_atomic_casptr((void **)&allocator->top, - (void *)&node, *(void **)node)) + if (my_atomic_casptr((void **)&allocator->top, (void *)&node, node->next)) break; } _lf_unpin(pins, 0); @@ -432,7 +512,7 @@ void *_lf_alloc_new(LF_PINS *pins) NOTE This is NOT thread-safe !!! */ -uint lf_alloc_in_pool(LF_ALLOCATOR *allocator) +uint lf_alloc_pool_count(LF_ALLOCATOR *allocator) { uint i; struct st_lf_alloc_node *node; diff --git a/mysys/lf_dynarray.c b/mysys/lf_dynarray.c index c6dd654bf03..770b1f9342b 100644 --- a/mysys/lf_dynarray.c +++ b/mysys/lf_dynarray.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2000 MySQL AB +/* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,8 +21,6 @@ Memory is allocated in non-contiguous chunks. This data structure is not space efficient for sparse arrays. - The number of elements is limited to 4311810304 - Every element is aligned to sizeof(element) boundary (to avoid false sharing if element is big enough). @@ -32,6 +30,9 @@ to arrays of elements, on the second level it's an array of pointers to arrays of pointers to arrays of elements. And so on. + With four levels the number of elements is limited to 4311810304 + (but as in all functions index is uint, the real limit is 2^32-1) + Actually, it's wait-free, not lock-free ;-) */ @@ -192,6 +193,9 @@ static int recursive_iterate(LF_DYNARRAY *array, void *ptr, int level, each. _lf_dynarray_iterate() calls user-supplied function on every array from the set. It is the fastest way to scan the array, faster than for (i=0; i < N; i++) { func(_lf_dynarray_value(dynarray, i)); } + + NOTE + if func() returns non-zero, the scan is aborted */ int _lf_dynarray_iterate(LF_DYNARRAY *array, lf_dynarray_func func, void *arg) { diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c index fb2fb88492f..832f0eb5852 100644 --- a/mysys/lf_hash.c +++ b/mysys/lf_hash.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2000 MySQL AB +/* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -36,6 +36,10 @@ typedef struct { uint32 hashnr; /* reversed hash number, for sorting */ const byte *key; uint keylen; + /* + data is stored here, directly after the keylen. + thus the pointer to data is (void*)(slist_element_ptr+1) + */ } LF_SLIST; /* @@ -77,20 +81,20 @@ static int lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr, retry: cursor->prev= (intptr *)head; - do { - cursor->curr= PTR(*cursor->prev); + do { /* PTR() isn't necessary below, head is a dummy node */ + cursor->curr= (LF_SLIST *)(*cursor->prev); _lf_pin(pins, 1, cursor->curr); - } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF); + } while (*cursor->prev != (intptr)cursor->curr && LF_BACKOFF); for (;;) { - if (!cursor->curr) - return 0; + if (unlikely(!cursor->curr)) + return 0; /* end of the list */ do { /* QQ: XXX or goto retry ? */ link= cursor->curr->link; cursor->next= PTR(link); _lf_pin(pins, 0, cursor->next); - } while(link != cursor->curr->link && LF_BACKOFF); + } while (link != cursor->curr->link && LF_BACKOFF); cur_hashnr= cursor->curr->hashnr; cur_key= cursor->curr->key; cur_keylen= cursor->curr->keylen; @@ -114,6 +118,10 @@ retry: } else { + /* + we found a deleted node - be nice, help the other thread + and remove this deleted node + */ if (my_atomic_casptr((void **)cursor->prev, (void **)&cursor->curr, cursor->next)) _lf_alloc_free(pins, cursor->curr); @@ -139,31 +147,44 @@ retry: NOTE it uses pins[0..2], on return all pins are removed. + if there're nodes with the same key value, a new node is added before them. */ static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs, LF_SLIST *node, LF_PINS *pins, uint flags) { CURSOR cursor; - int res= -1; + int res; - do + for (;;) { if (lfind(head, cs, node->hashnr, node->key, node->keylen, &cursor, pins) && (flags & LF_HASH_UNIQUE)) + { res= 0; /* duplicate found */ + break; + } else { node->link= (intptr)cursor.curr; - assert(node->link != (intptr)node); - assert(cursor.prev != &node->link); + DBUG_ASSERT(node->link != (intptr)node); /* no circular references */ + DBUG_ASSERT(cursor.prev != &node->link); /* no circular references */ if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node)) + { res= 1; /* inserted ok */ + break; + } } - } while (res == -1); + } _lf_unpin(pins, 0); _lf_unpin(pins, 1); _lf_unpin(pins, 2); + /* + Note that cursor.curr is not pinned here and the pointer is unreliable, + the object may dissapear anytime. But if it points to a dummy node, the + pointer is safe, because dummy nodes are never freed - initialize_bucket() + uses this fact. + */ return res ? 0 : cursor.curr; } @@ -183,24 +204,41 @@ static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr, const byte *key, uint keylen, LF_PINS *pins) { CURSOR cursor; - int res= -1; + int res; - do + for (;;) { if (!lfind(head, cs, hashnr, key, keylen, &cursor, pins)) - res= 1; + { + res= 1; /* not found */ + break; + } else + { + /* mark the node deleted */ if (my_atomic_casptr((void **)&(cursor.curr->link), - (void **)&cursor.next, 1+(char *)cursor.next)) + (void **)&cursor.next, + (void *)(((intptr)cursor.next) | 1))) { + /* and remove it from the list */ if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, cursor.next)) _lf_alloc_free(pins, cursor.curr); else + { + /* + somebody already "helped" us and removed the node ? + Let's check if we need to help that someone too! + (to ensure the number of "set DELETED flag" actions + is equal to the number of "remove from the list" actions) + */ lfind(head, cs, hashnr, key, keylen, &cursor, pins); + } res= 0; + break; } - } while (res == -1); + } + } _lf_unpin(pins, 0); _lf_unpin(pins, 1); _lf_unpin(pins, 2); @@ -226,7 +264,8 @@ static LF_SLIST *lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs, { CURSOR cursor; int res= lfind(head, cs, hashnr, key, keylen, &cursor, pins); - if (res) _lf_pin(pins, 2, cursor.curr); + if (res) + _lf_pin(pins, 2, cursor.curr); _lf_unpin(pins, 0); _lf_unpin(pins, 1); return res ? cursor.curr : 0; @@ -241,6 +280,11 @@ static inline const byte* hash_key(const LF_HASH *hash, return record + hash->key_offset; } +/* + compute the hash key value from the raw key. + note, that the hash value is limited to 2^31, because we need one + bit to distinguish between normal and dummy nodes. +*/ static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen) { ulong nr1= 1, nr2= 4; @@ -249,8 +293,9 @@ static inline uint calc_hash(LF_HASH *hash, const byte *key, uint keylen) return nr1 & INT_MAX32; } -#define MAX_LOAD 1.0 -static void initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *); +#define MAX_LOAD 1.0 /* average number of elements in a bucket */ + +static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *); /* Initializes lf_hash, the arguments are compatible with hash_init @@ -261,7 +306,7 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, { lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size, offsetof(LF_SLIST, key)); - lf_dynarray_init(&hash->array, sizeof(LF_SLIST **)); + lf_dynarray_init(&hash->array, sizeof(LF_SLIST *)); hash->size= 1; hash->count= 0; hash->element_size= element_size; @@ -275,14 +320,19 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, void lf_hash_destroy(LF_HASH *hash) { - LF_SLIST *el= *(LF_SLIST **)_lf_dynarray_lvalue(&hash->array, 0); + LF_SLIST *el, **head= (LF_SLIST **)_lf_dynarray_value(&hash->array, 0); + + if (unlikely(!head)) + return; + el= *head; + while (el) { intptr next= el->link; if (el->hashnr & 1) - lf_alloc_real_free(&hash->alloc, el); + lf_alloc_direct_free(&hash->alloc, el); /* normal node */ else - my_free((void *)el, MYF(0)); + my_free((void *)el, MYF(0)); /* dummy node */ el= (LF_SLIST *)next; } lf_alloc_destroy(&hash->alloc); @@ -297,6 +347,7 @@ void lf_hash_destroy(LF_HASH *hash) RETURN 0 - inserted 1 - didn't (unique key conflict) + -1 - out of memory NOTE see linsert() for pin usage notes @@ -308,14 +359,18 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data) lf_rwlock_by_pins(pins); node= (LF_SLIST *)_lf_alloc_new(pins); + if (unlikely(!node)) + return -1; memcpy(node+1, data, hash->element_size); node->key= hash_key(hash, (byte *)(node+1), &node->keylen); hashnr= calc_hash(hash, node->key, node->keylen); bucket= hashnr % hash->size; el= _lf_dynarray_lvalue(&hash->array, bucket); - if (*el == NULL) - initialize_bucket(hash, el, bucket, pins); - node->hashnr= my_reverse_bits(hashnr) | 1; + if (unlikely(!el)) + return -1; + if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins))) + return -1; + node->hashnr= my_reverse_bits(hashnr) | 1; /* normal node */ if (linsert(el, hash->charset, node, pins, hash->flags)) { _lf_alloc_free(pins, node); @@ -330,9 +385,14 @@ int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data) } /* + DESCRIPTION + deletes an element with the given key from the hash (if a hash is + not unique and there're many elements with this key - the "first" + matching element is deleted) RETURN 0 - deleted 1 - didn't (not found) + -1 - out of memory NOTE see ldelete() for pin usage notes */ @@ -344,8 +404,16 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) bucket= hashnr % hash->size; lf_rwlock_by_pins(pins); el= _lf_dynarray_lvalue(&hash->array, bucket); - if (*el == NULL) - initialize_bucket(hash, el, bucket, pins); + if (unlikely(!el)) + return -1; + /* + note that we still need to initialize_bucket here, + we cannot return "node not found", because an old bucket of that + node may've been split and the node was assigned to a new bucket + that was never accessed before and thus is not initialized. + */ + if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins))) + return -1; if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1, (byte *)key, keylen, pins)) { @@ -358,6 +426,12 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) } /* + RETURN + a pointer to an element with the given key (if a hash is not unique and + there're many elements with this key - the "first" matching element) + NULL if nothing is found + MY_ERRPTR if OOM + NOTE see lsearch() for pin usage notes */ @@ -369,32 +443,51 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) bucket= hashnr % hash->size; lf_rwlock_by_pins(pins); el= _lf_dynarray_lvalue(&hash->array, bucket); - if (*el == NULL) - initialize_bucket(hash, el, bucket, pins); + if (unlikely(!el)) + return MY_ERRPTR; + if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins))) + return MY_ERRPTR; found= lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1, (byte *)key, keylen, pins); lf_rwunlock_by_pins(pins); return found ? found+1 : 0; } -static const char *dummy_key= ""; +static const byte *dummy_key= ""; -static void initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, +/* + RETURN + 0 - ok + -1 - out of memory +*/ +static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, uint bucket, LF_PINS *pins) { uint parent= my_clear_highest_bit(bucket); LF_SLIST *dummy= (LF_SLIST *)my_malloc(sizeof(LF_SLIST), MYF(MY_WME)); LF_SLIST **tmp= 0, *cur; LF_SLIST * volatile *el= _lf_dynarray_lvalue(&hash->array, parent); - if (*el == NULL && bucket) - initialize_bucket(hash, el, parent, pins); - dummy->hashnr= my_reverse_bits(bucket); + if (unlikely(!el || !dummy)) + return -1; + if (*el == NULL && bucket && + unlikely(initialize_bucket(hash, el, parent, pins))) + return -1; + dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */ dummy->key= (char*) dummy_key; dummy->keylen= 0; - if ((cur= linsert(el, hash->charset, dummy, pins, 0))) + if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE))) { my_free((void *)dummy, MYF(0)); dummy= cur; } my_atomic_casptr((void **)node, (void **)&tmp, dummy); + /* + note that if the CAS above failed (after linsert() succeeded), + it would mean that some other thread has executed linsert() for + the same dummy node, its linsert() failed, it picked up our + dummy node (in "dummy= cur") and executed the same CAS as above. + Which means that even if CAS above failed we don't need to retry, + and we should not free(dummy) - there's no memory leak here + */ + return 0; } diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc index a714528a643..ecb966a4fbd 100644 --- a/storage/maria/ha_maria.cc +++ b/storage/maria/ha_maria.cc @@ -25,12 +25,14 @@ #include <myisampack.h> #include <my_bit.h> #include "ha_maria.h" +#include "trnman_public.h" #include "maria_def.h" #include "ma_rt_index.h" #include "ma_blockrec.h" ulong maria_recover_options= HA_RECOVER_NONE; +static handlerton *maria_hton; /* bits in maria_recover_options */ const char *maria_recover_names[]= @@ -464,7 +466,7 @@ ha_maria::ha_maria(handlerton *hton, TABLE_SHARE *table_arg): handler(hton, table_arg), file(0), int_table_flags(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER | HA_DUPLICATE_POS | HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY | - HA_FILE_BASED | HA_CAN_GEOMETRY | HA_NO_TRANSACTIONS | + HA_FILE_BASED | HA_CAN_GEOMETRY | HA_CAN_INSERT_DELAYED | HA_CAN_BIT_FIELD | HA_CAN_RTREEKEYS | HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT), can_enable_indexes(1) @@ -1846,14 +1848,69 @@ int ha_maria::delete_table(const char *name) return maria_delete_table(name); } +#define THD_TRN (*(TRN **)thd_ha_data(thd, maria_hton)) int ha_maria::external_lock(THD *thd, int lock_type) { - return maria_lock_database(file, !table->s->tmp_table ? - lock_type : ((lock_type == F_UNLCK) ? - F_UNLCK : F_EXTRA_LCK)); + TRN *trn= THD_TRN; + DBUG_ENTER("ha_maria::external_lock"); + if (!trn && lock_type != F_UNLCK) /* no transaction yet - open it now */ + { + trn= trnman_new_trn(& thd->mysys_var->mutex, + & thd->mysys_var->suspend, + thd->thread_stack + STACK_DIRECTION * + (my_thread_stack_size - STACK_MIN_SIZE)); + if (!trn) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + DBUG_PRINT("info", ("THD_TRN set to 0x%lx", (ulong)trn)); + THD_TRN= trn; + if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(thd, true, maria_hton); + } + if (lock_type != F_UNLCK) + { + this->file->trn= trn; + if (!trnman_increment_locked_tables(trn)) + { + trans_register_ha(thd, FALSE, maria_hton); + trnman_new_statement(trn); + } + } + else + { + this->file->trn= 0; /* TODO: remove it also in commit and rollback */ + if (trn && trnman_has_locked_tables(trn)) + { + if (!trnman_decrement_locked_tables(trn)) + { + /* autocommit ? rollback a transaction */ + if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) + { + trnman_rollback_trn(trn); + DBUG_PRINT("info", ("THD_TRN set to 0x0")); + THD_TRN= 0; + } + } + } + } + DBUG_RETURN(maria_lock_database(file, !table->s->tmp_table ? + lock_type : ((lock_type == F_UNLCK) ? + F_UNLCK : F_EXTRA_LCK))); } +int ha_maria::start_stmt(THD *thd, thr_lock_type lock_type) +{ + TRN *trn= THD_TRN; + DBUG_ASSERT(trn); // this may be called only after external_lock() + DBUG_ASSERT(lock_type != F_UNLCK); + if (!trnman_increment_locked_tables(trn)) + { + trans_register_ha(thd, false, maria_hton); + trnman_new_statement(trn); + } + return 0; +} THR_LOCK_DATA **ha_maria::store_lock(THD *thd, THR_LOCK_DATA **to, @@ -1936,6 +1993,7 @@ int ha_maria::create(const char *name, register TABLE *table_arg, share->avg_row_length); create_info.data_file_name= ha_create_info->data_file_name; create_info.index_file_name= ha_create_info->index_file_name; + create_info.transactional= row_type == BLOCK_RECORD; if (ha_create_info->options & HA_LEX_CREATE_TMP_TABLE) create_flags|= HA_CREATE_TMP_TABLE; @@ -2089,26 +2147,72 @@ bool ha_maria::check_if_incompatible_data(HA_CREATE_INFO *info, return COMPATIBLE_DATA_YES; } -extern int maria_panic(enum ha_panic_function flag); -int maria_panic(handlerton *hton, ha_panic_function flag) + +static int maria_hton_panic(handlerton *hton, ha_panic_function flag) { return maria_panic(flag); } -static int ha_maria_init(void *p) + +static int maria_commit(handlerton *hton __attribute__ ((unused)), + THD *thd, bool all) { - handlerton *maria_hton; + TRN *trn= THD_TRN; + DBUG_ENTER("maria_commit"); + trnman_reset_locked_tables(trn); + /* statement or transaction ? */ + if ((thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && !all) + DBUG_RETURN(0); // end of statement + DBUG_PRINT("info", ("THD_TRN set to 0x0")); + THD_TRN= 0; + DBUG_RETURN(trnman_commit_trn(trn) ? + HA_ERR_OUT_OF_MEM : 0); // end of transaction +} + +static int maria_rollback(handlerton *hton __attribute__ ((unused)), + THD *thd, bool all) +{ + TRN *trn= THD_TRN; + DBUG_ENTER("maria_rollback"); + trnman_reset_locked_tables(trn); + /* statement or transaction ? */ + if ((thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && !all) + { + trnman_rollback_statement(trn); + DBUG_RETURN(0); // end of statement + } + DBUG_PRINT("info", ("THD_TRN set to 0x0")); + THD_TRN= 0; + DBUG_RETURN(trnman_rollback_trn(trn) ? + HA_ERR_OUT_OF_MEM : 0); // end of transaction +} + + +static int ha_maria_init(void *p) +{ maria_hton= (handlerton *)p; maria_hton->state= SHOW_OPTION_YES; maria_hton->db_type= DB_TYPE_MARIA; maria_hton->create= maria_create_handler; - maria_hton->panic= maria_panic; + maria_hton->panic= maria_hton_panic; + maria_hton->commit= maria_commit; + maria_hton->rollback= maria_rollback; /* TODO: decide if we support Maria being used for log tables */ maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES; - return test(maria_init()); + bzero(maria_log_pagecache, sizeof(*maria_log_pagecache)); + maria_data_root= mysql_real_data_home; + return (test(maria_init() || ma_control_file_create_or_open() || + (init_pagecache(maria_log_pagecache, + TRANSLOG_PAGECACHE_SIZE, 0, 0, + TRANSLOG_PAGE_SIZE) == 0) || + translog_init(maria_data_root, TRANSLOG_FILE_SIZE, + MYSQL_VERSION_ID, server_id, maria_log_pagecache, + TRANSLOG_DEFAULT_FLAGS) || + trnman_init())); } + struct st_mysql_storage_engine maria_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; diff --git a/storage/maria/ha_maria.h b/storage/maria/ha_maria.h index 031a3dc3b98..dd0a9594ef3 100644 --- a/storage/maria/ha_maria.h +++ b/storage/maria/ha_maria.h @@ -110,6 +110,7 @@ public: int extra_opt(enum ha_extra_function operation, ulong cache_size); int reset(void); int external_lock(THD * thd, int lock_type); + int start_stmt(THD *thd, thr_lock_type lock_type); int delete_all_rows(void); int disable_indexes(uint mode); int enable_indexes(uint mode); diff --git a/storage/maria/lockman.c b/storage/maria/lockman.c index cb305dc9bd6..8316d70bb29 100644 --- a/storage/maria/lockman.c +++ b/storage/maria/lockman.c @@ -538,7 +538,7 @@ void lockman_destroy(LOCKMAN *lm) { intptr next= el->link; if (el->hashnr & 1) - lf_alloc_real_free(&lm->alloc, el); + lf_alloc_direct_free(&lm->alloc, el); else my_free((void *)el, MYF(0)); el= (LOCK *)next; diff --git a/storage/maria/ma_bitmap.c b/storage/maria/ma_bitmap.c index e7ced9fc1e1..923525922da 100644 --- a/storage/maria/ma_bitmap.c +++ b/storage/maria/ma_bitmap.c @@ -1837,7 +1837,7 @@ err: /* - Free full pages from bitmap + Free full pages from bitmap and pagecache SYNOPSIS _ma_bitmap_free_full_pages() @@ -1846,7 +1846,8 @@ err: count Number of extents IMPLEMENTATION - Mark all full pages (not tails) from extents as free + Mark all full pages (not tails) from extents as free, both in bitmap + and page cache. RETURN 0 ok @@ -1865,6 +1866,9 @@ my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const byte *extents, uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE); if (!(page_count & TAIL_BIT)) { + if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page, + page_count, PAGECACHE_LOCK_WRITE, 1)) + DBUG_RETURN(1); if (_ma_reset_full_page_bits(info, &info->s->bitmap, page, page_count)) { pthread_mutex_unlock(&info->s->bitmap.bitmap_lock); diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c index bb1cddd4d77..8d8adde46d1 100644 --- a/storage/maria/ma_blockrec.c +++ b/storage/maria/ma_blockrec.c @@ -162,7 +162,7 @@ - Store the parts in as many full-contiguous pages as possible. - The last part, that doesn't fill a full page, is stored in tail page. - When doing an insert of a new row, we don't have to have + When doing an insert of a new row, we don't have to have VER_PTR in the row. This will make rows that are not changed stored efficiently. On update and delete we would add TRANSID (if it was an old committed row) and VER_PTR to @@ -239,7 +239,7 @@ 06 00 00 00 00 80 00 First blob, stored at page 6-133 05 00 00 00 00 01 80 Tail of first blob (896 bytes) at page 5 86 00 00 00 00 80 00 Second blob, stored at page 134-262 - 05 00 00 00 00 02 80 Tail of second blob (896 bytes) at page 5 + 05 00 00 00 00 02 80 Tail of second blob (896 bytes) at page 5 05 00 5 integer FA Length of first varchar field (size 250) 00 60 Length of second varchar field (size 8192*3) @@ -256,6 +256,8 @@ #include "maria_def.h" #include "ma_blockrec.h" +#include <lf.h> +#include "trnman.h" /* Struct for having a cursor over a set of extent. @@ -298,6 +300,15 @@ static my_bool delete_head_or_tail(MARIA_HA *info, static void _ma_print_directory(byte *buff, uint block_size); static void compact_page(byte *buff, uint block_size, uint rownr, my_bool extend_block); +static uchar *store_page_range(uchar *to, MARIA_BITMAP_BLOCK *block, + uint block_size, ulong length); +static size_t fill_insert_undo_parts(MARIA_HA *info, const byte *record, + LEX_STRING *log_parts, + uint *log_parts_count); +static size_t fill_update_undo_parts(MARIA_HA *info, const byte *oldrec, + const byte *newrec, + LEX_STRING *log_parts, + uint *log_parts_count); /**************************************************************************** Initialization @@ -401,8 +412,9 @@ my_bool _ma_init_block_record(MARIA_HA *info) DBUG_ENTER("_ma_init_block_record"); if (!my_multi_malloc(MY_WME, - &row->empty_bits_buffer, info->s->base.pack_bytes, - &row->field_lengths, info->s->base.max_field_lengths, + &row->empty_bits, info->s->base.pack_bytes, + &row->field_lengths, + info->s->base.max_field_lengths + 2, &row->blob_lengths, sizeof(ulong) * info->s->base.blobs, &row->null_field_lengths, (sizeof(uint) * (info->s->base.fields - @@ -410,21 +422,37 @@ my_bool _ma_init_block_record(MARIA_HA *info) EXTRA_LENGTH_FIELDS)), &row->tail_positions, (sizeof(MARIA_RECORD_POS) * (info->s->base.blobs + 2)), - &new_row->empty_bits_buffer, info->s->base.pack_bytes, + &new_row->empty_bits, info->s->base.pack_bytes, &new_row->field_lengths, - info->s->base.max_field_lengths, + info->s->base.max_field_lengths + 2, &new_row->blob_lengths, sizeof(ulong) * info->s->base.blobs, &new_row->null_field_lengths, (sizeof(uint) * (info->s->base.fields - info->s->base.blobs + EXTRA_LENGTH_FIELDS)), + &info->log_row_parts, + sizeof(*info->log_row_parts) * + (TRANSLOG_INTERNAL_PARTS + 2 + + info->s->base.fields + 2), + &info->update_field_data, + (info->s->base.fields * 4 + + info->s->base.max_field_lengths + 1 + 4), NullS, 0)) DBUG_RETURN(1); + /* Skip over bytes used to store length of field length for logging */ + row->field_lengths+= 2; + new_row->field_lengths+= 2; if (my_init_dynamic_array(&info->bitmap_blocks, sizeof(MARIA_BITMAP_BLOCK), ELEMENTS_RESERVED_FOR_MAIN_PART, 16)) - my_free((char*) &info->bitmap_blocks, MYF(0)); + goto err; + /* The following should be big enough for all purposes */ + if (my_init_dynamic_array(&info->pinned_pages, + sizeof(MARIA_PINNED_PAGE), + max(info->s->base.blobs + 2, + MARIA_MAX_TREE_LEVELS*2), 16)) + goto err; row->base_length= new_row->base_length= info->s->base_length; /* @@ -434,15 +462,21 @@ my_bool _ma_init_block_record(MARIA_HA *info) row->null_field_lengths+= EXTRA_LENGTH_FIELDS; new_row->null_field_lengths+= EXTRA_LENGTH_FIELDS; + DBUG_RETURN(0); + +err: + _ma_end_block_record(info); + DBUG_RETURN(1); } void _ma_end_block_record(MARIA_HA *info) { DBUG_ENTER("_ma_end_block_record"); - my_free((gptr) info->cur_row.empty_bits_buffer, MYF(MY_ALLOW_ZERO_PTR)); + my_free((gptr) info->cur_row.empty_bits, MYF(MY_ALLOW_ZERO_PTR)); delete_dynamic(&info->bitmap_blocks); + delete_dynamic(&info->pinned_pages); my_free((gptr) info->cur_row.extents, MYF(MY_ALLOW_ZERO_PTR)); /* The data file is closed, when needed, in ma_once_end_block_record(). @@ -508,6 +542,44 @@ static my_bool check_if_zero(byte *pos, uint length) /* + Unpin all pinned pages + + SYNOPSIS + _ma_unpin_all_pages() + info Maria handler + undo_lsn LSN for undo pages. 0 if we shouldn't write undo (error) + + NOTE + We unpin pages in the reverse order as they where pinned; This may not + be strictly necessary but may simplify things in the future. + + RETURN + 0 ok + 1 error (fatal disk error) + +*/ + +void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn) +{ + MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*) + dynamic_array_ptr(&info->pinned_pages, 0)); + MARIA_PINNED_PAGE *pinned_page= page_link + info->pinned_pages.elements; + DBUG_ENTER("_ma_unpin_all_pages"); + DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn)); + + /* True if not disk error */ + DBUG_ASSERT(undo_lsn != 0 || info->s->base.transactional == 0); + + while (pinned_page-- != page_link) + pagecache_unlock(info->s->pagecache, pinned_page->link, + pinned_page->unlock, PAGECACHE_UNPIN, 0, undo_lsn); + + info->pinned_pages.elements= 0; + DBUG_VOID_RETURN; +} + + +/* Find free position in directory SYNOPSIS @@ -576,7 +648,7 @@ static byte *find_free_position(byte *buff, uint block_size, uint *res_rownr, if (max_entry == MAX_ROWS_PER_PAGE) DBUG_RETURN(0); /* Check if there is place for the directory entry */ - if ((dir - buff) < first_pos) + if ((uint) (dir - buff) < first_pos) { /* Create place for directory */ compact_page(buff, block_size, max_entry-1, 0); @@ -595,9 +667,9 @@ static byte *find_free_position(byte *buff, uint block_size, uint *res_rownr, /* Reduce directory entry size from free space size */ (*empty_space)-= DIR_ENTRY_SIZE; DBUG_RETURN(dir); - } + /**************************************************************************** Updating records ****************************************************************************/ @@ -630,8 +702,7 @@ static void calc_record_size(MARIA_HA *info, const byte *record, row->blob_length= row->extents_count= 0; /* Create empty bitmap and calculate length of each varlength/char field */ - bzero(row->empty_bits_buffer, share->base.pack_bytes); - row->empty_bits= row->empty_bits_buffer; + bzero(row->empty_bits, share->base.pack_bytes); field_length_data= row->field_lengths; for (column= share->columndef + share->base.fixed_not_null_fields, end_column= share->columndef + share->base.fields; @@ -918,20 +989,23 @@ struct st_row_pos_info byte *data; /* Place for data */ byte *dir; /* Directory */ uint length; /* Length for data */ - uint offset; /* Offset to directory */ + uint rownr; /* Offset in directory */ uint empty_space; /* Space left on page */ }; static my_bool get_head_or_tail_page(MARIA_HA *info, MARIA_BITMAP_BLOCK *block, byte *buff, uint length, uint page_type, + enum pagecache_page_lock lock, struct st_row_pos_info *res) { uint block_size; + MARIA_PINNED_PAGE page_link; + MARIA_SHARE *share= info->s; DBUG_ENTER("get_head_or_tail_page"); DBUG_PRINT("enter", ("length: %u", length)); - block_size= info->s->block_size; + block_size= share->block_size; if (block->org_bitmap_value == 0) /* Empty block */ { /* New page */ @@ -951,7 +1025,7 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, res->empty_space= res->length= (block_size - PAGE_OVERHEAD_SIZE); res->data= (buff + PAGE_HEADER_SIZE); res->dir= res->data + res->length; - res->offset= 0; + res->rownr= 0; /* Store position to the first row */ int2store(res->dir, PAGE_HEADER_SIZE); DBUG_ASSERT(length <= res->length); @@ -961,15 +1035,23 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, byte *dir; /* TODO: lock the page */ /* Read old page */ - DBUG_ASSERT(info->s->pagecache->block_size == block_size); - if (!(res->buff= pagecache_read(info->s->pagecache, + DBUG_ASSERT(share->pagecache->block_size == block_size); + if (!(res->buff= pagecache_read(share->pagecache, &info->dfile, (my_off_t) block->page, 0, - buff, PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) + buff, share->page_type, + lock, &page_link.link))) DBUG_RETURN(1); + if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED) + { + page_link.unlock= (lock == PAGECACHE_LOCK_READ ? + PAGECACHE_LOCK_READ_UNLOCK : + PAGECACHE_LOCK_WRITE_UNLOCK); + push_dynamic(&info->pinned_pages, (void*) &page_link); + } + DBUG_ASSERT((res->buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type); - if (!(dir= find_free_position(res->buff, block_size, &res->offset, + if (!(dir= find_free_position(res->buff, block_size, &res->rownr, &res->length, &res->empty_space))) goto crashed; @@ -977,9 +1059,9 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, { if (res->empty_space + res->length < length) { - compact_page(res->buff, block_size, res->offset, 1); + compact_page(res->buff, block_size, res->rownr, 1); /* All empty space are now after current position */ - dir= (res->buff + block_size - DIR_ENTRY_SIZE * res->offset - + dir= (res->buff + block_size - DIR_ENTRY_SIZE * res->rownr - PAGE_SUFFIX_SIZE); res->length= res->empty_space= uint2korr(dir+2); } @@ -998,7 +1080,7 @@ crashed: /* - Write tail of non-blob-data or blob + Write tail for head data or blob SYNOPSIS write_tail() @@ -1023,9 +1105,11 @@ static my_bool write_tail(MARIA_HA *info, byte *row_part, uint length) { MARIA_SHARE *share= share= info->s; + MARIA_PINNED_PAGE page_link; uint block_size= share->block_size, empty_space; struct st_row_pos_info row_pos; my_off_t position; + my_bool res, block_is_read; DBUG_ENTER("write_tail"); DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) block->page, length)); @@ -1034,10 +1118,36 @@ static my_bool write_tail(MARIA_HA *info, /* page will be pinned & locked by get_head_or_tail_page */ if (get_head_or_tail_page(info, block, info->keyread_buff, length, - TAIL_PAGE, &row_pos)) + TAIL_PAGE, PAGECACHE_LOCK_WRITE, + &row_pos)) DBUG_RETURN(1); + block_is_read= block->org_bitmap_value != 0; memcpy(row_pos.data, row_part, length); + + { + /* Log changes in tail block */ + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; + LSN lsn; + + /* Log REDO changes of tail page */ + fileid_store(log_data, info->dfile.file); + page_store(log_data+ FILEID_STORE_SIZE, block->page); + dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE, + row_pos.rownr); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char*) row_pos.data; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length; + if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_TAIL, + info->trn->short_id, NULL, share, + sizeof(log_data) + length, + TRANSLOG_INTERNAL_PARTS + 2, + log_array)) + DBUG_RETURN(1); + } + /* Don't allocate smaller block than MIN_TAIL_SIZE (we want to give rows some place to grow in the future) @@ -1047,7 +1157,7 @@ static my_bool write_tail(MARIA_HA *info, int2store(row_pos.dir + 2, length); empty_space= row_pos.empty_space - length; int2store(row_pos.buff + EMPTY_SPACE_OFFSET, empty_space); - block->page_count= row_pos.offset + TAIL_BIT; + block->page_count= row_pos.rownr + TAIL_BIT; /* If there is less directory entries free than number of possible tails we can write for a row, we mark the page full to ensure that we don't @@ -1055,7 +1165,7 @@ static my_bool write_tail(MARIA_HA *info, than it can hold */ block->empty_space= ((uint) ((uchar*) row_pos.buff)[DIR_COUNT_OFFSET] <= - MAX_ROWS_PER_PAGE - 1 - info->s->base.blobs ? + MAX_ROWS_PER_PAGE - 1 - share->base.blobs ? empty_space : 0); block->used= BLOCKUSED_USED | BLOCKUSED_TAIL; @@ -1063,15 +1173,28 @@ static my_bool write_tail(MARIA_HA *info, position= (my_off_t) block->page * block_size; if (info->state->data_file_length <= position) info->state->data_file_length= position + block_size; - /* TODO: left the page pinned (or pin it if it is new) and unlock\ - the page (do not lock if it is new */ + DBUG_ASSERT(share->pagecache->block_size == block_size); - DBUG_RETURN(pagecache_write(share->pagecache, - &info->dfile, block->page, 0, - row_pos.buff,PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, - PAGECACHE_PIN_LEFT_UNPINNED, - PAGECACHE_WRITE_DELAY, 0)); + if (!(res= pagecache_write(share->pagecache, + &info->dfile, block->page, 0, + row_pos.buff,share->page_type, + block_is_read ? PAGECACHE_LOCK_WRITE_TO_READ : + PAGECACHE_LOCK_READ, + block_is_read ? PAGECACHE_PIN_LEFT_PINNED : + PAGECACHE_PIN, + PAGECACHE_WRITE_DELAY, &page_link.link))) + { + page_link.unlock= PAGECACHE_LOCK_READ_UNLOCK; + if (block_is_read) + { + /* Change the lock used when we read the page */ + set_dynamic(&info->pinned_pages, (void*) &page_link, + info->pinned_pages.elements-1); + } + else + push_dynamic(&info->pinned_pages, (void*) &page_link); + } + DBUG_RETURN(res); } @@ -1081,13 +1204,22 @@ static my_bool write_tail(MARIA_HA *info, SYNOPSIS write_full_pages() info Maria handler + lsn LSN for the undo record block Where to write data data Data to write length Length of data + NOTES + Logging of the changes to the full pages are done in the caller + write_block_record(). + + RETURN + 0 ok + 1 error on write */ static my_bool write_full_pages(MARIA_HA *info, + LSN lsn, MARIA_BITMAP_BLOCK *block, byte *data, ulong length) { @@ -1128,20 +1260,16 @@ static my_bool write_full_pages(MARIA_HA *info, if (info->state->data_file_length < position) info->state->data_file_length= position; } - bzero(buff, LSN_SIZE); + lsn_store(buff, lsn); buff[PAGE_TYPE_OFFSET]= (byte) BLOB_PAGE; copy_length= min(data_size, length); memcpy(buff + LSN_SIZE + PAGE_TYPE_SIZE, data, copy_length); length-= copy_length; - /* - TODO: replace PAGECACHE_PLAIN_PAGE with PAGECACHE_LSN_PAGE when - LSN on the pages will be implemented - */ DBUG_ASSERT(share->pagecache->block_size == block_size); if (pagecache_write(share->pagecache, &info->dfile, page, 0, - buff, PAGECACHE_PLAIN_PAGE, + buff, share->page_type, PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_PIN_LEFT_UNPINNED, PAGECACHE_WRITE_DELAY, @@ -1154,6 +1282,46 @@ static my_bool write_full_pages(MARIA_HA *info, } +/* + Store ranges of full pages in compact format for logging + + SYNOPSIS + store_page_range() + to Store data here + block Where pages are to be written + block_size block size + length Length of data to be written + Normally this is full pages, except for the last + tail block that may only partly fit the last page. + + RETURN + # end position for 'to' +*/ + +static uchar *store_page_range(uchar *to, MARIA_BITMAP_BLOCK *block, + uint block_size, ulong length) +{ + uint data_size= FULL_PAGE_SIZE(block_size); + ulong pages_left= (length + data_size -1) / data_size; + uint page_count; + DBUG_ENTER("store_page_range"); + + do + { + ulonglong page; + page= block->page; + page_count= block->page_count; + block++; + if (page_count > pages_left) + page_count= pages_left; + + page_store(to, page); + to+= PAGE_STORE_SIZE; + pagerange_store(to, page_count); + to+= PAGERANGE_STORE_SIZE; + } while ((pages_left-= page_count)); + DBUG_RETURN(to); +} /* @@ -1186,8 +1354,8 @@ static void store_extent_info(byte *to, if (likely(block->used & BLOCKUSED_USED)) { DBUG_ASSERT(block->page_count != 0); - int5store(to, block->page); - int2store(to + 5, block->page_count); + page_store(to, block->page); + pagerange_store(to + PAGE_STORE_SIZE, block->page_count); to+= ROW_EXTENT_SIZE; if (!first_found) { @@ -1204,25 +1372,113 @@ static void store_extent_info(byte *to, bzero(to, (my_size_t) (row_extents_second_part + copy_length - to)); } + +/* + Free regions of pages with logging + + RETURN + 0 ok + 1 error +*/ + +static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row) +{ + uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; + LSN lsn; + size_t extents_length= row->extents_count * ROW_EXTENT_SIZE; + DBUG_ENTER("free_full_pages"); + + fileid_store(log_data, info->dfile.file); + pagerange_store(log_data + FILEID_STORE_SIZE, + row->extents_count); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= row->extents; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extents_length; + if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS, + info->trn->short_id, NULL, info->s, + sizeof(log_data) + extents_length, + TRANSLOG_INTERNAL_PARTS + 2, log_array)) + DBUG_RETURN(1); + + DBUG_RETURN (_ma_bitmap_free_full_pages(info, row->extents, + row->extents_count)); +} + + +/* + Free one page range + + NOTES + This is very similar to free_full_pages() + + RETURN + 0 ok + 1 error +*/ + +static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) +{ + uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + ROW_EXTENT_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; + LSN lsn; + my_bool res= 0; + + if (pagecache_delete_pages(info->s->pagecache, &info->dfile, + page, count, PAGECACHE_LOCK_WRITE, 0)) + res= 1; + + fileid_store(log_data, info->dfile.file); + pagerange_store(log_data + FILEID_STORE_SIZE, 1); + int5store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, + page); + int2store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + 5, + count); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + + if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS, + info->trn->short_id, NULL, info->s, + sizeof(log_data), + TRANSLOG_INTERNAL_PARTS + 1, log_array)) + res= 1; + + pthread_mutex_lock(&info->s->bitmap.bitmap_lock); + if (_ma_reset_full_page_bits(info, &info->s->bitmap, page, + count)) + res= 1; + pthread_mutex_unlock(&info->s->bitmap.bitmap_lock); + return res; +} + + /* Write a record to a (set of) pages SYNOPSIS write_block_record() info Maria handler + old_record Orignal record in case of update; NULL in case of insert record Record we should write row Statistics about record (calculated by calc_record_size()) map_blocks On which pages the record should be stored row_pos Position on head page where to put head part of record + NOTES + On return all pinned pages are released. + RETURN 0 ok 1 error */ -static my_bool write_block_record(MARIA_HA *info, const byte *record, +static my_bool write_block_record(MARIA_HA *info, + const byte *old_record, const byte *record, MARIA_ROW *row, MARIA_BITMAP_BLOCKS *bitmap_blocks, + my_bool head_block_is_read, struct st_row_pos_info *row_pos) { byte *data, *end_of_data, *tmp_data_used, *tmp_data; @@ -1230,18 +1486,19 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, byte *field_length_data; byte *page_buff; MARIA_BITMAP_BLOCK *block, *head_block; - MARIA_SHARE *share; + MARIA_SHARE *share= info->s; MARIA_COLUMNDEF *column, *end_column; + MARIA_PINNED_PAGE page_link; uint block_size, flag; ulong *blob_lengths; + my_bool row_extents_in_use, blob_full_pages_exists; + LSN lsn; my_off_t position; - my_bool row_extents_in_use; DBUG_ENTER("write_block_record"); LINT_INIT(row_extents_first_part); LINT_INIT(row_extents_second_part); - share= info->s; head_block= bitmap_blocks->block; block_size= share->block_size; @@ -1458,6 +1715,7 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, the tail page of the non-blob data) */ + blob_full_pages_exists= 0; if (row_extents_in_use) { if (column != end_column) /* If blob fields */ @@ -1479,11 +1737,16 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, memcpy_fixed((byte *) &blob_pos, record + column->offset + length, sizeof(char*)); length= *blob_lengths % FULL_PAGE_SIZE(block_size); /* tail size */ + if (length != *blob_lengths) + blob_full_pages_exists= 1; if (write_tail(info, block + block->sub_blocks-1, blob_pos + *blob_lengths - length, length)) goto disk_err; } + else + blob_full_pages_exists= 1; + for (end_block= block + block->sub_blocks; block < end_block; block++) { /* @@ -1668,18 +1931,196 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, row_extents_second_part, head_block+1, bitmap_blocks->count - 1); } + + if (share->base.transactional) + { + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; + size_t data_length= (size_t) (data - row_pos->data); + + /* Log REDO changes of head page */ + fileid_store(log_data, info->dfile.file); + page_store(log_data+ FILEID_STORE_SIZE, head_block->page); + dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE, + row_pos->rownr); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char*) row_pos->data; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_length; + if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_HEAD, + info->trn->short_id, NULL, share, + sizeof(log_data) + data_length, + TRANSLOG_INTERNAL_PARTS + 2, log_array)) + goto disk_err; + } + /* Increase data file size, if extended */ position= (my_off_t) head_block->page * block_size; if (info->state->data_file_length <= position) info->state->data_file_length= position + block_size; + DBUG_ASSERT(share->pagecache->block_size == block_size); if (pagecache_write(share->pagecache, &info->dfile, head_block->page, 0, - page_buff, PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, - PAGECACHE_PIN_LEFT_UNPINNED, - PAGECACHE_WRITE_DELAY, 0)) + page_buff, share->page_type, + head_block_is_read ? PAGECACHE_LOCK_WRITE_TO_READ : + PAGECACHE_LOCK_READ, + head_block_is_read ? PAGECACHE_PIN_LEFT_PINNED : + PAGECACHE_PIN, + PAGECACHE_WRITE_DELAY, &page_link.link)) goto disk_err; + page_link.unlock= PAGECACHE_LOCK_READ_UNLOCK; + if (head_block_is_read) + { + /* Head page is always the first pinned page */ + set_dynamic(&info->pinned_pages, (void*) &page_link, 0); + } + else + push_dynamic(&info->pinned_pages, (void*) &page_link); + + if (share->base.transactional && (tmp_data_used || blob_full_pages_exists)) + { + /* + Log REDO writes for all full pages (head part and all blobs) + We write all here to be able to generate the UNDO record early + so that we can write the LSN for the UNDO record to all full pages. + */ + uchar tmp_log_data[FILEID_STORE_SIZE + LSN_STORE_SIZE + PAGE_STORE_SIZE + + ROW_EXTENT_SIZE * ROW_EXTENTS_ON_STACK]; + uchar *log_data, *log_pos; + LEX_STRING tmp_log_array[TRANSLOG_INTERNAL_PARTS + 2 + + ROW_EXTENTS_ON_STACK]; + LEX_STRING *log_array_pos, *log_array; + int error; + ulong log_entry_length= 0; + + /* If few extents, then allocate things on stack to avoid a malloc call */ + if (bitmap_blocks->count < ROW_EXTENTS_ON_STACK) + { + log_array= tmp_log_array; + log_data= tmp_log_data; + } + else + { + if (my_multi_malloc(MY_WME, &log_array, + (uint) ((bitmap_blocks->count + + TRANSLOG_INTERNAL_PARTS + 2) * + sizeof(*log_array)), + &log_data, bitmap_blocks->count * ROW_EXTENT_SIZE, + NullS)) + goto disk_err; + } + fileid_store(log_data, info->dfile.file); + log_pos= log_data + FILEID_STORE_SIZE; + log_array_pos= log_array+ TRANSLOG_INTERNAL_PARTS+1; + + if (tmp_data_used) + { + /* Full head pages */ + size_t data_length= (ulong) (tmp_data - info->rec_buff); + log_pos= store_page_range(log_pos, head_block+1, block_size, + data_length); + log_array_pos->str= (char*) info->rec_buff; + log_array_pos->length= data_length; + log_entry_length+= data_length; + log_array_pos++; + } + if (blob_full_pages_exists) + { + MARIA_COLUMNDEF *tmp_column= column; + ulong *tmp_blob_lengths= blob_lengths; + MARIA_BITMAP_BLOCK *tmp_block= block; + + /* Full blob pages */ + for (; tmp_column < end_column; tmp_column++, tmp_blob_lengths++) + { + ulong blob_length; + uint length; + + if (!*tmp_blob_lengths) /* Null or "" */ + continue; + length= tmp_column->length - portable_sizeof_char_ptr; + blob_length= *tmp_blob_lengths; + if (tmp_block[tmp_block->sub_blocks - 1].used & BLOCKUSED_TAIL) + blob_length-= (blob_length % FULL_PAGE_SIZE(block_size)); + if (blob_length) + { + log_array_pos->str= (char*) record + column->offset + length; + log_array_pos->length= blob_length; + log_entry_length+= blob_length; + log_array_pos++; + + log_pos= store_page_range(log_pos, tmp_block, block_size, + blob_length); + tmp_block+= tmp_block->sub_blocks; + } + } + } + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (size_t) (log_pos - + log_data); + log_entry_length+= (log_pos - log_data); + + error= translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_BLOBS, + info->trn->short_id, NULL, share, + log_entry_length, (uint) (log_array_pos - + log_array), + log_array); + if (log_array != tmp_log_array) + my_free((gptr) log_array, MYF(0)); + if (error) + goto disk_err; + } + + /* Write UNDO record */ + if (share->base.transactional) + { + uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; + LEX_STRING *log_array= info->log_row_parts; + + /* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_INSERT share same header */ + lsn_store(log_data, info->trn->undo_lsn); + fileid_store(log_data + LSN_STORE_SIZE, info->dfile.file); + page_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE, + head_block->page); + dirpos_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE, + row_pos->rownr); + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + + if (!old_record) + { + /* Write UNDO log record for the INSERT */ + if (translog_write_record(&info->trn->undo_lsn, LOGREC_UNDO_ROW_INSERT, + info->trn->short_id, NULL, share, + sizeof(log_data), + TRANSLOG_INTERNAL_PARTS + 1, + log_array)) + goto disk_err; + } + else + { + /* Write UNDO log record for the UPDATE */ + size_t row_length; + uint row_parts_count; + row_length= fill_update_undo_parts(info, old_record, record, + info->log_row_parts + + TRANSLOG_INTERNAL_PARTS + 1, + &row_parts_count); + if (translog_write_record(&info->trn->undo_lsn, LOGREC_UNDO_ROW_UPDATE, + info->trn->short_id, NULL, share, + sizeof(log_data) + row_length, + TRANSLOG_INTERNAL_PARTS + 1 + row_parts_count, + log_array)) + goto disk_err; + } + } + + _ma_unpin_all_pages(info, info->trn->undo_lsn); if (tmp_data_used) { @@ -1688,8 +2129,8 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, This is the char/varchar data that didn't fit into the head page. */ DBUG_ASSERT(bitmap_blocks->count != 0); - if (write_full_pages(info, head_block + 1, info->rec_buff, - (ulong) (tmp_data - info->rec_buff))) + if (write_full_pages(info, info->trn->undo_lsn, head_block + 1, + info->rec_buff, (ulong) (tmp_data - info->rec_buff))) goto disk_err; } @@ -1709,7 +2150,8 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, if (block[block->sub_blocks - 1].used & BLOCKUSED_TAIL) blob_length-= (blob_length % FULL_PAGE_SIZE(block_size)); - if (write_full_pages(info, block, blob_pos, blob_length)) + if (write_full_pages(info, info->trn->undo_lsn, block, + blob_pos, blob_length)) goto disk_err; block+= block->sub_blocks; } @@ -1719,9 +2161,13 @@ static my_bool write_block_record(MARIA_HA *info, const byte *record, DBUG_RETURN(0); crashed: - my_errno= HA_ERR_WRONG_IN_RECORD; /* File crashed */ + /* Something was wrong with data on page */ + my_errno= HA_ERR_WRONG_IN_RECORD; + disk_err: - /* Something was wrong with data on record */ + /* Unpin all pinned pages to not cause problems for disk cache */ + _ma_unpin_all_pages(info, 0); + DBUG_RETURN(1); } @@ -1754,12 +2200,15 @@ MARIA_RECORD_POS _ma_write_init_block_record(MARIA_HA *info, DBUG_RETURN(HA_OFFSET_ERROR); /* Error reading bitmap */ /* page will be pinned & locked by get_head_or_tail_page */ if (get_head_or_tail_page(info, blocks->block, info->buff, - info->s->base.min_row_length, HEAD_PAGE, &row_pos)) + info->s->base.min_row_length, HEAD_PAGE, + PAGECACHE_LOCK_WRITE, &row_pos)) DBUG_RETURN(HA_OFFSET_ERROR); - info->cur_row.lastpos= ma_recordpos(blocks->block->page, row_pos.offset); + info->cur_row.lastpos= ma_recordpos(blocks->block->page, row_pos.rownr); if (info->s->calc_checksum) info->cur_row.checksum= (info->s->calc_checksum)(info,record); - if (write_block_record(info, record, &info->cur_row, blocks, &row_pos)) + if (write_block_record(info, (byte*) 0, record, &info->cur_row, + blocks, blocks->block->org_bitmap_value != 0, + &row_pos)) DBUG_RETURN(HA_OFFSET_ERROR); /* Error reading bitmap */ DBUG_PRINT("exit", ("Rowid: %lu", (ulong) info->cur_row.lastpos)); info->s->state.split++; @@ -1775,8 +2224,7 @@ MARIA_RECORD_POS _ma_write_init_block_record(MARIA_HA *info, */ my_bool _ma_write_block_record(MARIA_HA *info __attribute__ ((unused)), - const byte *record __attribute__ ((unused)) -) + const byte *record __attribute__ ((unused))) { return 0; /* Row already written */ } @@ -1822,15 +2270,36 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) 0)) res= 1; } - else + else if (block->used & BLOCKUSED_USED) { - pthread_mutex_lock(&info->s->bitmap.bitmap_lock); - if (_ma_reset_full_page_bits(info, &info->s->bitmap, block->page, - block->page_count)) + if (free_full_page_range(info, block->page, block->page_count)) res= 1; - pthread_mutex_unlock(&info->s->bitmap.bitmap_lock); } } + + if (info->s->base.transactional) + { + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; + uchar log_data[LSN_STORE_SIZE]; + + /* + Write UNDO record + This entry is just an end marker for the abort_insert as we will never + really undo a failed insert. Note that this UNDO will cause recover + to ignore the LOGREC_UNDO_ROW_INSERT that is the previous entry + in the UNDO chain. + */ + lsn_store(log_data, info->trn->undo_lsn); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + if (translog_write_record(&info->trn->undo_lsn, LOGREC_UNDO_ROW_PURGE, + info->trn->short_id, NULL, info->s, + sizeof(log_data), + TRANSLOG_INTERNAL_PARTS + 1, + log_array)) + res= 1; + } + _ma_unpin_all_pages(info, info->trn->undo_lsn); DBUG_RETURN(res); } @@ -1845,28 +2314,33 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) */ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, - const byte *record) + const byte *oldrec, const byte *record) { MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks; byte *buff; MARIA_ROW *cur_row= &info->cur_row, *new_row= &info->new_row; + MARIA_PINNED_PAGE page_link; uint rownr, org_empty_size, head_length; uint block_size= info->s->block_size; byte *dir; ulonglong page; struct st_row_pos_info row_pos; + MARIA_SHARE *share= info->s; DBUG_ENTER("_ma_update_block_record"); DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos)); calc_record_size(info, record, new_row); page= ma_recordpos_to_page(record_pos); - DBUG_ASSERT(info->s->pagecache->block_size == block_size); - if (!(buff= pagecache_read(info->s->pagecache, + DBUG_ASSERT(share->pagecache->block_size == block_size); + if (!(buff= pagecache_read(share->pagecache, &info->dfile, (my_off_t) page, 0, - info->buff, PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) + info->buff, share->page_type, + PAGECACHE_LOCK_WRITE, &page_link.link))) DBUG_RETURN(1); + page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK; + push_dynamic(&info->pinned_pages, (void*) &page_link); + org_empty_size= uint2korr(buff + EMPTY_SPACE_OFFSET); rownr= ma_recordpos_to_dir_entry(record_pos); dir= (buff + block_size - DIR_ENTRY_SIZE * rownr - @@ -1878,10 +2352,10 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, MARIA_BITMAP_BLOCK block; /* - We can fit the new row in the same page as the original head part - of the row + We can fit the new row in the same page as the original head part + of the row */ - block.org_bitmap_value= _ma_free_size_to_head_pattern(&info->s->bitmap, + block.org_bitmap_value= _ma_free_size_to_head_pattern(&share->bitmap, org_empty_size); offset= uint2korr(dir); length= uint2korr(dir + 2); @@ -1893,13 +2367,13 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, empty= start_of_next_entry(dir) - (offset + length); if (new_row->total_length > length + empty) { - compact_page(buff, info->s->block_size, rownr, 1); + compact_page(buff, share->block_size, rownr, 1); org_empty_size= 0; length= uint2korr(dir + 2); } } row_pos.buff= buff; - row_pos.offset= rownr; + row_pos.rownr= rownr; row_pos.empty_space= org_empty_size + length; row_pos.dir= dir; row_pos.data= buff + uint2korr(dir); @@ -1912,10 +2386,11 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, block.empty_space= row_pos.empty_space; /* Update cur_row, if someone calls update at once again */ cur_row->head_length= new_row->total_length; - if (_ma_bitmap_free_full_pages(info, cur_row->extents, - cur_row->extents_count)) - DBUG_RETURN(1); - DBUG_RETURN(write_block_record(info, record, new_row, blocks, &row_pos)); + + if (free_full_pages(info, cur_row)) + goto err; + DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, + 1, &row_pos)); } /* Allocate all size in block for record @@ -1928,27 +2403,31 @@ my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos, (new_row->total_length <= head_length && org_empty_size + head_length >= new_row->total_length))) { - compact_page(buff, info->s->block_size, rownr, 1); + compact_page(buff, share->block_size, rownr, 1); org_empty_size= 0; head_length= uint2korr(dir + 2); } /* Delete old row */ if (delete_tails(info, cur_row->tail_positions)) - DBUG_RETURN(1); - if (_ma_bitmap_free_full_pages(info, cur_row->extents, - cur_row->extents_count)) - DBUG_RETURN(1); + goto err; + if (free_full_pages(info, cur_row)) + goto err; if (_ma_bitmap_find_new_place(info, new_row, page, head_length, blocks)) - DBUG_RETURN(1); + goto err; row_pos.buff= buff; - row_pos.offset= rownr; + row_pos.rownr= rownr; row_pos.empty_space= org_empty_size + head_length; row_pos.dir= dir; row_pos.data= buff + uint2korr(dir); row_pos.length= head_length; - DBUG_RETURN(write_block_record(info, record, new_row, blocks, &row_pos)); + DBUG_RETURN(write_block_record(info, oldrec, record, new_row, blocks, 1, + &row_pos)); + +err: + _ma_unpin_all_pages(info, 0); + DBUG_RETURN(1); } @@ -1977,6 +2456,8 @@ static my_bool delete_head_or_tail(MARIA_HA *info, uint number_of_records, empty_space, length; uint block_size= share->block_size; byte *buff, *dir; + LSN lsn; + MARIA_PINNED_PAGE page_link; DBUG_ENTER("delete_head_or_tail"); info->keyread_buff_used= 1; @@ -1984,9 +2465,11 @@ static my_bool delete_head_or_tail(MARIA_HA *info, if (!(buff= pagecache_read(share->pagecache, &info->dfile, page, 0, info->keyread_buff, - PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) + info->s->page_type, + PAGECACHE_LOCK_WRITE, &page_link.link))) DBUG_RETURN(1); + page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK; + push_dynamic(&info->pinned_pages, (void*) &page_link); number_of_records= (uint) ((uchar *) buff)[DIR_COUNT_OFFSET]; #ifdef SANITY_CHECKS @@ -2021,19 +2504,60 @@ static my_bool delete_head_or_tail(MARIA_HA *info, empty_space+= length; if (number_of_records != 0) { + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; + + /* Update directory */ int2store(buff + EMPTY_SPACE_OFFSET, empty_space); buff[PAGE_TYPE_OFFSET]|= (byte) PAGE_CAN_BE_COMPACTED; DBUG_ASSERT(share->pagecache->block_size == block_size); + + /* Log REDO data */ + fileid_store(log_data, info->dfile.file); + page_store(log_data+ FILEID_STORE_SIZE, page); + dirpos_store(log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE, + record_number); + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + if (translog_write_record(&lsn, + (head ? LOGREC_REDO_PURGE_ROW_HEAD : + LOGREC_REDO_PURGE_ROW_TAIL), + info->trn->short_id, NULL, share, + sizeof(log_data), TRANSLOG_INTERNAL_PARTS + 1, + log_array)) + DBUG_RETURN(1); if (pagecache_write(share->pagecache, &info->dfile, page, 0, - buff, PAGECACHE_PLAIN_PAGE, - PAGECACHE_LOCK_LEFT_UNLOCKED, - PAGECACHE_PIN_LEFT_UNPINNED, - PAGECACHE_WRITE_DELAY, 0)) + buff, share->page_type, + PAGECACHE_LOCK_WRITE_TO_READ, + PAGECACHE_PIN_LEFT_PINNED, + PAGECACHE_WRITE_DELAY, &page_link.link)) DBUG_RETURN(1); + + /* Change the lock used when we read the page */ + page_link.unlock= PAGECACHE_LOCK_READ_UNLOCK; + set_dynamic(&info->pinned_pages, (void*) &page_link, + info->pinned_pages.elements-1); } else { + uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + PAGE_STORE_SIZE + PAGERANGE_STORE_SIZE]; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; + + fileid_store(log_data, info->dfile.file); + pagerange_store(log_data + FILEID_STORE_SIZE, 1); + page_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE, page); + pagerange_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + + PAGERANGE_STORE_SIZE, 1); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + if (translog_write_record(&lsn, LOGREC_REDO_PURGE_BLOCKS, + info->trn->short_id, NULL, share, + sizeof(log_data), TRANSLOG_INTERNAL_PARTS + 1, + log_array)) + DBUG_RETURN(1); DBUG_ASSERT(empty_space >= info->s->bitmap.sizes[0]); } DBUG_PRINT("info", ("empty_space: %u", empty_space)); @@ -2081,18 +2605,58 @@ static my_bool delete_tails(MARIA_HA *info, MARIA_RECORD_POS *tails) for rows with many splits. */ -my_bool _ma_delete_block_record(MARIA_HA *info) +my_bool _ma_delete_block_record(MARIA_HA *info, const byte *record) { + ulonglong page; + uint record_number; DBUG_ENTER("_ma_delete_block_record"); - if (delete_head_or_tail(info, - ma_recordpos_to_page(info->cur_row.lastpos), - ma_recordpos_to_dir_entry(info->cur_row.lastpos), - 1) || + + page= ma_recordpos_to_page(info->cur_row.lastpos); + record_number= ma_recordpos_to_dir_entry(info->cur_row.lastpos); + + if (delete_head_or_tail(info, page, record_number, 1) || delete_tails(info, info->cur_row.tail_positions)) - DBUG_RETURN(1); + goto err; + info->s->state.split--; - DBUG_RETURN(_ma_bitmap_free_full_pages(info, info->cur_row.extents, - info->cur_row.extents_count)); + + if (info->cur_row.extents && free_full_pages(info, &info->cur_row)) + goto err; + + { + uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + + DIR_COUNT_SIZE]; + size_t row_length; + uint row_parts_count; + + /* Write UNDO record */ + lsn_store(log_data, info->trn->undo_lsn); + fileid_store(log_data+ LSN_STORE_SIZE, info->dfile.file); + page_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE, page); + dirpos_store(log_data+ LSN_STORE_SIZE + FILEID_STORE_SIZE + + PAGE_STORE_SIZE, record_number); + + info->log_row_parts[TRANSLOG_INTERNAL_PARTS].str= (char*) log_data; + info->log_row_parts[TRANSLOG_INTERNAL_PARTS].length= sizeof(log_data); + row_length= fill_insert_undo_parts(info, record, info->log_row_parts + + TRANSLOG_INTERNAL_PARTS + 1, + &row_parts_count); + + if (translog_write_record(&info->trn->undo_lsn, LOGREC_UNDO_ROW_DELETE, + info->trn->short_id, NULL, info->s, + sizeof(log_data) + row_length, + TRANSLOG_INTERNAL_PARTS + 1 + row_parts_count, + info->log_row_parts)) + goto err; + + } + + _ma_unpin_all_pages(info, info->trn->undo_lsn); + DBUG_RETURN(0); + +err: + _ma_unpin_all_pages(info, 0); + DBUG_RETURN(1); } @@ -2173,7 +2737,7 @@ static void init_extent(MARIA_EXTENT_CURSOR *extent, byte *extent_info, extent->extent= extent_info; extent->extent_count= extents; extent->page= uint5korr(extent_info); /* First extent */ - page_count= uint2korr(extent_info+5); + page_count= uint2korr(extent_info + ROW_EXTENT_PAGE_SIZE); extent->page_count= page_count & ~TAIL_BIT; extent->tail= page_count & TAIL_BIT; extent->tail_positions= tail_positions; @@ -2223,21 +2787,10 @@ static byte *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent, extent->tail != 0)); } - if (info->cur_row.empty_bits != info->cur_row.empty_bits_buffer) - { - /* - First read of extents: Move data from info->buff to - internals buffers. - */ - memcpy(info->cur_row.empty_bits_buffer, info->cur_row.empty_bits, - share->base.pack_bytes); - info->cur_row.empty_bits= info->cur_row.empty_bits_buffer; - } - DBUG_ASSERT(share->pagecache->block_size == share->block_size); if (!(buff= pagecache_read(share->pagecache, &info->dfile, extent->page, 0, - info->buff, PAGECACHE_PLAIN_PAGE, + info->buff, share->page_type, PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) { /* check if we tried to read over end of file (ie: bad data in record) */ @@ -2309,6 +2862,19 @@ static my_bool read_long_data(MARIA_HA *info, byte *to, ulong length, DBUG_PRINT("enter", ("length: %lu", length)); DBUG_ASSERT(*data <= *end_of_data); + /* + Fields are never split in middle. This means that if length > rest-of-data + we should start reading from the next extent. The reason we may have + data left on the page is that there fixed part of the row was less than + min_row_length and in this case the head block was extended to + min_row_length. + + This may change in the future, which is why we have the loop written + the way it's written. + */ + if (length > (ulong) (*end_of_data - *data)) + *end_of_data= *data; + for(;;) { uint left_length; @@ -2347,7 +2913,7 @@ static my_bool read_long_data(MARIA_HA *info, byte *to, ulong length, cur_row.tail_positions is set to point to all tail blocks cur_row.extents points to extents data cur_row.extents_counts contains number of extents - cur_row.empty_bits points to empty bits part in read record + cur_row.empty_bits is set to empty bits cur_row.field_lengths contains packed length of all fields RETURN @@ -2415,6 +2981,7 @@ int _ma_read_block_record2(MARIA_HA *info, byte *record, if (share->base.max_field_lengths) { get_key_length(field_lengths, data); + info->cur_row.field_lengths_length= field_lengths; #ifdef SANITY_CHECKS if (field_lengths > share->base.max_field_lengths) goto err; @@ -2434,7 +3001,8 @@ int _ma_read_block_record2(MARIA_HA *info, byte *record, bzero(record + cur_null_bytes, (uint) (null_bytes - cur_null_bytes)); } data+= null_bytes; - info->cur_row.empty_bits= (byte*) data; /* Pointer to empty bitmask */ + /* We copy the empty bits to be able to use them for delete/update */ + memcpy(info->cur_row.empty_bits, data, share->base.pack_bytes); data+= share->base.pack_bytes; /* TODO: Use field offsets, instead of just skipping them */ @@ -2454,7 +3022,7 @@ int _ma_read_block_record2(MARIA_HA *info, byte *record, /* Data now points to start of fixed length field data that can't be null - or 'empty'. Note that these fields can't be split over blocks + or 'empty'. Note that these fields can't be split over blocks. */ for (column= share->columndef, end_column= column + share->base.fixed_not_null_fields; @@ -2661,7 +3229,7 @@ int _ma_read_block_record(MARIA_HA *info, byte *record, DBUG_ASSERT(info->s->pagecache->block_size == block_size); if (!(buff= pagecache_read(info->s->pagecache, &info->dfile, ma_recordpos_to_page(record_pos), 0, - info->buff, PAGECACHE_PLAIN_PAGE, + info->buff, info->s->page_type, PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) DBUG_RETURN(1); DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == HEAD_PAGE); @@ -2780,10 +3348,10 @@ void _ma_scan_end_block_record(MARIA_HA *info) use a variable in info->scan IMPLEMENTATION - Current code uses a lot of goto's to separate the different kind of - states we may be in. This gives us a minimum of executed if's for - the normal cases. I tried several different ways to code this, but - the current one was in the end the most readable and fastest. + Current code uses a lot of goto's to separate the different kind of + states we may be in. This gives us a minimum of executed if's for + the normal cases. I tried several different ways to code this, but + the current one was in the end the most readable and fastest. RETURN 0 ok @@ -2796,6 +3364,7 @@ int _ma_scan_block_record(MARIA_HA *info, byte *record, { uint block_size; my_off_t filepos; + MARIA_SHARE *share= info->s; DBUG_ENTER("_ma_scan_block_record"); restart_record_read: @@ -2823,7 +3392,7 @@ restart_record_read: info->scan.dir-= DIR_ENTRY_SIZE; /* Point to previous row */ #ifdef SANITY_CHECKS if (end_of_data > info->scan.dir_end || - offset < PAGE_HEADER_SIZE || length < info->s->base.min_block_length) + offset < PAGE_HEADER_SIZE || length < share->base.min_block_length) goto err; #endif DBUG_PRINT("info", ("rowid: %lu", (ulong) info->cur_row.lastpos)); @@ -2832,7 +3401,7 @@ restart_record_read: /* Find next head page in current bitmap */ restart_bitmap_scan: - block_size= info->s->block_size; + block_size= share->block_size; if (likely(info->scan.bitmap_pos < info->scan.bitmap_end)) { byte *data= info->scan.bitmap_pos; @@ -2856,10 +3425,10 @@ restart_bitmap_scan: page= (info->scan.bitmap_page + 1 + (data - info->scan.bitmap_buff) / 6 * 16 + bit_pos - 1); info->scan.row_base_page= ma_recordpos(page, 0); - if (!(pagecache_read(info->s->pagecache, + if (!(pagecache_read(share->pagecache, &info->dfile, page, 0, info->scan.page_buff, - PAGECACHE_PLAIN_PAGE, + share->page_type, PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) DBUG_RETURN(my_errno); if (((info->scan.page_buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != @@ -2890,13 +3459,13 @@ restart_bitmap_scan: } /* Read next bitmap */ - info->scan.bitmap_page+= info->s->bitmap.pages_covered; + info->scan.bitmap_page+= share->bitmap.pages_covered; filepos= (my_off_t) info->scan.bitmap_page * block_size; if (unlikely(filepos >= info->state->data_file_length)) { DBUG_RETURN((my_errno= HA_ERR_END_OF_FILE)); } - if (!(pagecache_read(info->s->pagecache, &info->dfile, + if (!(pagecache_read(share->pagecache, &info->dfile, info->scan.bitmap_page, 0, info->scan.bitmap_buff, PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_LEFT_UNLOCKED, 0))) @@ -2960,3 +3529,438 @@ static void _ma_print_directory(byte *buff, uint block_size) } #endif /* DBUG_OFF */ + +/* + Store an integer with simple packing + + SYNOPSIS + ma_store_integer() + to Store the packed integer here + nr Integer to store + + NOTES + This is mostly used to store field numbers and lengths of strings. + We have to cast the result for the LL() becasue of a bug in Forte CC + compiler. + + Packing used is: + nr < 251 is stored as is (in 1 byte) + Numbers that require 1-4 bytes are stored as char(250+byte_length), data + Bigger numbers are stored as 255, data as ulonglong (not yet done). + + RETURN + Position in 'to' after the packed length +*/ + +uchar *ma_store_length(uchar *to, ulong nr) +{ + if (nr < 251) + { + *to=(uchar) nr; + return to+1; + } + if (nr < 65536) + { + if (nr <= 255) + { + to[0]= (uchar) 251; + to[1]= (uchar) nr; + return to+2; + } + to[0]= (uchar) 252; + int2store(to+1, nr); + return to+3; + } + if (nr < 16777216) + { + *to++= (uchar) 253; + int3store(to, nr); + return to+3; + } + *to++= (uchar) 254; + int4store(to, nr); + return to+4; +} + + +/* Calculate how many bytes needed to store a number */ + +uint ma_calc_length_for_store_length(ulong nr) +{ + if (nr < 251) + return 1; + if (nr < 65536) + { + if (nr <= 255) + return 2; + return 3; + } + if (nr < 16777216) + return 4; + return 5; +} + + +/* + Fill array with pointers to field parts to be stored in log for insert + + SYNOPSIS + fill_insert_undo_parts() + info Maria handler + record Inserted row + log_parts Store pointers to changed memory areas here + log_parts_count See RETURN + + NOTES + We have information in info->cur_row about the read row. + + RETURN + length of data in log_parts. + log_parts_count contains number of used log_parts +*/ + +static size_t fill_insert_undo_parts(MARIA_HA *info, const byte *record, + LEX_STRING *log_parts, + uint *log_parts_count) +{ + MARIA_SHARE *share= info->s; + MARIA_COLUMNDEF *column, *end_column; + uchar *field_lengths= info->cur_row.field_lengths; + size_t row_length; + MARIA_ROW *cur_row= &info->cur_row; + LEX_STRING *start_log_parts; + DBUG_ENTER("fill_insert_undo_parts"); + + start_log_parts= log_parts; + + /* Store null bits */ + log_parts->str= (char*) record; + log_parts->length= share->base.null_bytes; + row_length= log_parts->length; + log_parts++; + + /* Stored bitmap over packed (zero length or all-zero fields) */ + start_log_parts= log_parts; + log_parts->str= info->cur_row.empty_bits; + log_parts->length= share->base.pack_bytes; + row_length+= log_parts->length; + log_parts++; + + if (share->base.max_field_lengths) + { + /* Store field lenghts, with a prefix of number of bytes */ + log_parts->str= field_lengths-2; + log_parts->length= info->cur_row.field_lengths_length+2; + int2store(log_parts->str, info->cur_row.field_lengths_length); + row_length+= log_parts->length; + log_parts++; + } + + /* Handle constant length fields that are always present */ + for (column= share->columndef, + end_column= column+ share->base.fixed_not_null_fields; + column < end_column; + column++) + { + log_parts->str= (char*) record + column->offset; + log_parts->length= column->length; + row_length+= log_parts->length; + log_parts++; + } + + /* Handle NULL fields and CHAR/VARCHAR fields */ + for (end_column= share->columndef + share->base.fields - share->base.blobs; + column < end_column; + column++) + { + const uchar *column_pos; + size_t column_length; + if ((record[column->null_pos] & column->null_bit) || + cur_row->empty_bits[column->empty_pos] & column->empty_bit) + continue; + + column_pos= record+ column->offset; + column_length= column->length; + + switch ((enum en_fieldtype) column->type) { + case FIELD_CHECK: + case FIELD_NORMAL: /* Fixed length field */ + case FIELD_ZERO: + case FIELD_SKIP_PRESPACE: /* Not packed */ + case FIELD_SKIP_ZERO: /* Fixed length field */ + break; + case FIELD_SKIP_ENDSPACE: /* CHAR */ + { + if (column->length <= 255) + column_length= *field_lengths++; + else + { + column_length= uint2korr(field_lengths); + field_lengths+= 2; + } + break; + } + case FIELD_VARCHAR: + { + if (column->length <= 256) + { + column_length= *field_lengths; + field_lengths++; + } + else + { + column_length= uint2korr(field_lengths); + field_lengths+= 2; + } + break; + } + default: + DBUG_ASSERT(0); + } + log_parts->str= (char*) column_pos; + log_parts->length= column_length; + row_length+= log_parts->length; + log_parts++; + } + + /* Add blobs */ + for (end_column+= share->base.blobs; column < end_column; column++) + { + const byte *field_pos= record + column->offset; + uint size_length= column->length - portable_sizeof_char_ptr; + ulong blob_length= _ma_calc_blob_length(size_length, field_pos); + + /* + We don't have to check for null, as blob_length is guranteed to be 0 + if the blob is null + */ + if (blob_length) + { + char *blob_pos; + memcpy_fixed((byte*) &blob_pos, record + column->offset + size_length, + sizeof(blob_pos)); + log_parts->str= blob_pos; + log_parts->length= blob_length; + row_length+= log_parts->length; + log_parts++; + } + } + *log_parts_count= (log_parts - start_log_parts); + DBUG_RETURN(row_length); +} + + +/* + Fill array with pointers to field parts to be stored in log for update + + SYNOPSIS + fill_update_undo_parts() + info Maria handler + oldrec Original row + newrec New row + log_parts Store pointers to changed memory areas here + log_parts_count See RETURN + + IMPLEMENTATION + Format of undo record: + + Fields are stored in same order as the field array. + + Number of changed fields (packed) + + For each changed field + Fieldnumber (packed) + Length, if variable length field (packed) + + For each changed field + Data + + Packing is using ma_store_integer() + + The reason we store field numbers & length separated from data (ie, not + after each other) is to get better cpu caching when we loop over + fields (as we probably don't have to access data for each field when we + want to read and old row through the undo log record). + + As a special case, we use '255' for the field number of the null bitmap. + + RETURN + length of data in log_parts. + log_parts_count contains number of used log_parts +*/ + +static size_t fill_update_undo_parts(MARIA_HA *info, const byte *oldrec, + const byte *newrec, + LEX_STRING *log_parts, + uint *log_parts_count) +{ + MARIA_SHARE *share= info->s; + MARIA_COLUMNDEF *column, *end_column; + MARIA_ROW *old_row= &info->cur_row, *new_row= &info->new_row; + uchar *field_data, *start_field_data; + uchar *old_field_lengths= old_row->field_lengths; + uchar *new_field_lengths= new_row->field_lengths; + size_t row_length; + uint field_count= 0; + LEX_STRING *start_log_parts; + my_bool new_column_is_empty; + DBUG_ENTER("fill_update_undo_parts"); + + start_log_parts= log_parts; + + /* + First log part is for number of fields, field numbers and lengths + The +4 is to reserve place for the number of changed fields. + */ + start_field_data= field_data= info->update_field_data + 4; + log_parts++; + + if (memcmp(oldrec, newrec, share->base.null_bytes)) + { + /* Store changed null bits */ + *field_data++= (uchar) 255; /* Special case */ + field_count++; + log_parts->str= (char*) oldrec; + log_parts->length= share->base.null_bytes; + row_length= log_parts->length; + log_parts++; + } + + /* Handle constant length fields */ + for (column= share->columndef, + end_column= column+ share->base.fixed_not_null_fields; + column < end_column; + column++) + { + if (memcmp(oldrec + column->offset, newrec + column->offset, + column->length)) + { + field_data= ma_store_length(field_data, + (uint) (column - share->columndef)); + field_count++; + log_parts->str= (char*) oldrec + column->offset; + log_parts->length= column->length; + row_length+= log_parts->length; + log_parts++; + } + } + + /* Handle the rest: NULL fields and CHAR/VARCHAR fields and BLOB's */ + for (end_column= share->columndef + share->base.fields; + column < end_column; + column++) + { + const uchar *new_column_pos, *old_column_pos; + size_t new_column_length, old_column_length; + + /* First check if old column is null or empty */ + if (oldrec[column->null_pos] & column->null_bit) + { + /* + It's safe to skip this one as either the new column is also null + (no change) or the new_column is not null, in which case the null-bit + maps differed and we have already stored the null bitmap. + */ + continue; + } + if (old_row->empty_bits[column->empty_pos] & column->empty_bit) + { + if (new_row->empty_bits[column->empty_pos] & column->empty_bit) + continue; /* Both are empty; skip */ + + /* Store null length column */ + field_data= ma_store_length(field_data, + (uint) (column - share->columndef)); + field_data= ma_store_length(field_data, 0); + field_count++; + continue; + } + /* + Remember if the 'new' value is empty (as in this case we must always + log the original value + */ + new_column_is_empty= ((newrec[column->null_pos] & column->null_bit) || + (new_row->empty_bits[column->empty_pos] & + column->empty_bit)); + + old_column_pos= oldrec + column->offset; + new_column_pos= newrec + column->offset; + old_column_length= new_column_length= column->length; + + switch ((enum en_fieldtype) column->type) { + case FIELD_CHECK: + case FIELD_NORMAL: /* Fixed length field */ + case FIELD_ZERO: + case FIELD_SKIP_PRESPACE: /* Not packed */ + case FIELD_SKIP_ZERO: /* Fixed length field */ + break; + case FIELD_VARCHAR: + new_column_length--; /* Skip length prefix */ + /* Fall through */ + case FIELD_SKIP_ENDSPACE: /* CHAR */ + { + if (new_column_length <= 255) + { + old_column_length= *old_field_lengths++; + if (!new_column_is_empty) + new_column_length= *new_field_lengths++; + } + else + { + old_column_length= uint2korr(old_field_lengths); + old_field_lengths+= 2; + if (!new_column_is_empty) + { + new_column_length= uint2korr(new_field_lengths); + new_field_lengths+= 2; + } + } + break; + } + case FIELD_BLOB: + { + uint size_length= column->length - portable_sizeof_char_ptr; + old_column_length= _ma_calc_blob_length(size_length, old_column_pos); + memcpy_fixed((byte*) &old_column_pos, + oldrec + column->offset + size_length, + sizeof(old_column_pos)); + if (!new_column_is_empty) + { + new_column_length= _ma_calc_blob_length(size_length, new_column_pos); + memcpy_fixed((byte*) &new_column_pos, + newrec + column->offset + size_length, + sizeof(old_column_pos)); + } + break; + } + default: + DBUG_ASSERT(0); + } + + if (new_column_is_empty || new_column_length != old_column_length || + memcmp(old_column_pos, new_column_pos, new_column_length)) + { + field_data= ma_store_length(field_data, + (uint) (column - share->columndef)); + field_data= ma_store_length(field_data, old_column_length); + field_count++; + + log_parts->str= (char*) old_column_pos; + log_parts->length= old_column_length; + row_length+= log_parts->length; + log_parts++; + } + } + + *log_parts_count= (log_parts - start_log_parts); + + /* Store number of fields before the field/field_lengths */ + start_log_parts->str= ((char*) + (start_field_data - + ma_calc_length_for_store_length(field_count))); + ma_store_length(start_log_parts->str, field_count); + start_log_parts->length= (size_t) ((char*) field_data - + start_log_parts->str); + row_length+= start_log_parts->length; + DBUG_RETURN(row_length); +} diff --git a/storage/maria/ma_blockrec.h b/storage/maria/ma_blockrec.h index 9e251a8c59d..f45250ff39c 100644 --- a/storage/maria/ma_blockrec.h +++ b/storage/maria/ma_blockrec.h @@ -102,6 +102,9 @@ enum en_page_type { UNALLOCATED_PAGE, HEAD_PAGE, TAIL_PAGE, BLOB_PAGE, MAX_PAGE_ */ #define MAX_TAIL_SIZE(block_size) ((block_size) *3 / 4) +/* Don't allocate memory for too many row extents on the stack */ +#define ROW_EXTENTS_ON_STACK 32 + extern uchar maria_bitmap_marker[2]; /* Functions to convert MARIA_RECORD_POS to/from page:offset */ @@ -130,8 +133,8 @@ my_bool _ma_init_block_record(MARIA_HA *info); void _ma_end_block_record(MARIA_HA *info); my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS pos, - const byte *record); -my_bool _ma_delete_block_record(MARIA_HA *info); + const byte *oldrec, const byte *newrec); +my_bool _ma_delete_block_record(MARIA_HA *info, const byte *record); int _ma_read_block_record(MARIA_HA *info, byte *record, MARIA_RECORD_POS record_pos); int _ma_read_block_record2(MARIA_HA *info, byte *record, diff --git a/storage/maria/ma_check.c b/storage/maria/ma_check.c index 4087f12ba43..82ad67c2452 100644 --- a/storage/maria/ma_check.c +++ b/storage/maria/ma_check.c @@ -4507,7 +4507,8 @@ static int sort_delete_record(MARIA_SORT_PARAM *sort_param) if (sort_param->calc_checksum) param->glob_crc-=(*info->s->calc_checksum)(info, sort_param->record); } - error=flush_io_cache(&info->rec_cache) || (*info->s->delete_record)(info); + error= (flush_io_cache(&info->rec_cache) || + (*info->s->delete_record)(info, sort_param->record)); info->dfile.file= old_file; /* restore actual value */ info->state->records--; DBUG_RETURN(error); diff --git a/storage/maria/ma_control_file.h b/storage/maria/ma_control_file.h index 159cd15b3d6..4728d719b2f 100644 --- a/storage/maria/ma_control_file.h +++ b/storage/maria/ma_control_file.h @@ -56,6 +56,9 @@ typedef enum enum_control_file_error { #define CONTROL_FILE_UPDATE_ONLY_LSN 1 #define CONTROL_FILE_UPDATE_ONLY_LOGNO 2 +#ifdef __cplusplus +extern "C" { +#endif /* Looks for the control file. If absent, it's a fresh start, create file. @@ -74,3 +77,7 @@ int ma_control_file_write_and_force(const LSN checkpoint_lsn, uint32 logno, /* Free resources taken by control file subsystem */ int ma_control_file_end(); + +#ifdef __cplusplus +} +#endif diff --git a/storage/maria/ma_delete.c b/storage/maria/ma_delete.c index f7b11cb6f48..067dd060a92 100644 --- a/storage/maria/ma_delete.c +++ b/storage/maria/ma_delete.c @@ -89,7 +89,7 @@ int maria_delete(MARIA_HA *info,const byte *record) } } - if ((*share->delete_record)(info)) + if ((*share->delete_record)(info, record)) goto err; /* Remove record from database */ /* diff --git a/storage/maria/ma_dynrec.c b/storage/maria/ma_dynrec.c index d6f7309cafa..ebf84032106 100644 --- a/storage/maria/ma_dynrec.c +++ b/storage/maria/ma_dynrec.c @@ -237,6 +237,7 @@ my_bool _ma_write_dynamic_record(MARIA_HA *info, const byte *record) } my_bool _ma_update_dynamic_record(MARIA_HA *info, MARIA_RECORD_POS pos, + const byte *oldrec __attribute__ ((unused)), const byte *record) { uint length= _ma_rec_pack(info, info->rec_buff + MARIA_REC_BUFF_OFFSET, @@ -277,6 +278,7 @@ my_bool _ma_write_blob_record(MARIA_HA *info, const byte *record) my_bool _ma_update_blob_record(MARIA_HA *info, MARIA_RECORD_POS pos, + const byte *oldrec __attribute__ ((unused)), const byte *record) { byte *rec_buff; @@ -309,7 +311,8 @@ my_bool _ma_update_blob_record(MARIA_HA *info, MARIA_RECORD_POS pos, } -my_bool _ma_delete_dynamic_record(MARIA_HA *info) +my_bool _ma_delete_dynamic_record(MARIA_HA *info, + const byte *record __attribute__ ((unused))) { return delete_dynamic_record(info, info->cur_row.lastpos, 0); } @@ -1371,7 +1374,7 @@ int _ma_read_dynamic_record(MARIA_HA *info, byte *buf, MARIA_RECORD_POS filepos) { int block_of_record; - uint b_type,left_length; + uint b_type; MARIA_BLOCK_INFO block_info; File file; DBUG_ENTER("_ma_read_dynamic_record"); diff --git a/storage/maria/ma_init.c b/storage/maria/ma_init.c index 271eac6c6d1..8f7cdf291ae 100644 --- a/storage/maria/ma_init.c +++ b/storage/maria/ma_init.c @@ -43,6 +43,7 @@ int maria_init(void) maria_inited= TRUE; pthread_mutex_init(&THR_LOCK_maria,MY_MUTEX_INIT_SLOW); _ma_init_block_record_data(); + loghandler_init(); } return 0; } diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c index 198be85ab8c..d16595be24e 100644 --- a/storage/maria/ma_loghandler.c +++ b/storage/maria/ma_loghandler.c @@ -1,4 +1,5 @@ #include "maria_def.h" +#include "ma_blockrec.h" /* number of opened log files in the pagecache (should be at least 2) */ #define OPENED_FILES_NUM 3 @@ -34,14 +35,6 @@ -/* record part descriptor */ -struct st_translog_part -{ - translog_size_t len; - byte *buff; -}; - - /* record parts descriptor */ struct st_translog_parts { @@ -49,10 +42,12 @@ struct st_translog_parts translog_size_t record_length; /* full record length with chunk headers */ translog_size_t total_record_length; - /* array of parts (st_translog_part) */ - DYNAMIC_ARRAY parts; /* current part index */ uint current; + /* total number of elements in parts */ + uint elements; + /* array of parts (LEX_STRING) */ + LEX_STRING *parts; }; /* log write buffer descriptor */ @@ -175,7 +170,7 @@ enum record_class #define TRANSLOG_CLSN_MAX_LEN 5 /* Maximum length of compressed LSN */ typedef my_bool(*prewrite_rec_hook) (enum translog_record_type type, - void *tcb, + void *tcb, struct st_maria_share *share, struct st_translog_parts *parts); typedef my_bool(*inwrite_rec_hook) (enum translog_record_type type, @@ -213,139 +208,209 @@ struct st_log_record_type_descriptor }; -static struct st_log_record_type_descriptor - log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]= +/* + Initialize log_record_type_descriptors + + NOTE that after first public Maria release, these can NOT be changed +*/ + +typedef struct st_log_record_type_descriptor LOG_DESC; + +static LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; + +static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23= +{ LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0 }; + +static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_HEAD= +{LOGRECTYPE_VARIABLE_LENGTH, 0, + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_TAIL= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOB= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, NULL, NULL, 0}; + +/*QQQ:TODO:header???*/ +static LOG_DESC INIT_LOGREC_REDO_INSERT_ROW_BLOBS= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_HEAD= +{LOGRECTYPE_FIXEDLENGTH, + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_PURGE_ROW_TAIL= +{LOGRECTYPE_FIXEDLENGTH, + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + NULL, NULL, NULL, 0}; + +/* QQQ: TODO: variable and fixed size??? */ +static LOG_DESC INIT_LOGREC_REDO_PURGE_BLOCKS= +{LOGRECTYPE_VARIABLE_LENGTH, + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + PAGE_STORE_SIZE + + PAGERANGE_STORE_SIZE, + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE + PAGE_STORE_SIZE + + PAGERANGE_STORE_SIZE, + NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_DELETE_ROW= +{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_UPDATE_ROW_HEAD= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_INDEX= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_UNDELETE_ROW= +{LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_CLR_END= +{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1}; + +static LOG_DESC INIT_LOGREC_PURGE_END= +{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1}; + +static LOG_DESC INIT_LOGREC_UNDO_ROW_INSERT= +{LOGRECTYPE_FIXEDLENGTH, + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_UNDO_ROW_DELETE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_UNDO_ROW_UPDATE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE, + NULL, NULL, NULL, 2}; + +static LOG_DESC INIT_LOGREC_UNDO_ROW_PURGE= +{LOGRECTYPE_PSEUDOFIXEDLENGTH, LSN_STORE_SIZE, LSN_STORE_SIZE, + NULL, NULL, NULL, 1}; + +static LOG_DESC INIT_LOGREC_UNDO_KEY_INSERT= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, NULL, NULL, 1}; + +static LOG_DESC INIT_LOGREC_UNDO_KEY_DELETE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 15, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_PREPARE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_PREPARE_WITH_UNDO_PURGE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 5, NULL, NULL, NULL, 1}; + +static LOG_DESC INIT_LOGREC_COMMIT= +{LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_COMMIT_WITH_UNDO_PURGE= +{LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1}; + +static LOG_DESC INIT_LOGREC_CHECKPOINT_PAGE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 6, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_CHECKPOINT_TRAN= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_CHECKPOINT_TABL= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_CREATE_TABLE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_RENAME_TABLE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_REDO_TRUNCATE_TABLE= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_FILE_ID= +{LOGRECTYPE_VARIABLE_LENGTH, 0, 4, NULL, NULL, NULL, 0}; + +static LOG_DESC INIT_LOGREC_LONG_TRANSACTION_ID= +{LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0}; + + +void loghandler_init() { - /*LOGREC_RESERVED_FOR_CHUNKS23= 0 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_INSERT_ROW_HEAD= 1 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_INSERT_ROW_TAIL= 2 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_INSERT_ROW_BLOB= 3 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_INSERT_ROW_BLOBS= 4 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_PURGE_ROW= 5 */ - {LOGRECTYPE_FIXEDLENGTH, 9, 9, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_PURGE_BLOCKS= 6 */ - {LOGRECTYPE_FIXEDLENGTH, 10, 10, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_DELETE_ROW= 7 */ - {LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_UPDATE_ROW_HEAD= 8 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_INDEX= 9 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 9, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_UNDELETE_ROW= 10 */ - {LOGRECTYPE_FIXEDLENGTH, 16, 16, NULL, NULL, NULL, 0}, - /*LOGREC_CLR_END= 11 */ - {LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1}, - /*LOGREC_PURGE_END= 12 */ - {LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1}, - /*LOGREC_UNDO_ROW_INSERT= 13 */ - {LOGRECTYPE_PSEUDOFIXEDLENGTH, 14, 14, NULL, NULL, NULL, 1}, - /*LOGREC_UNDO_ROW_DELETE= 14 */ - {LOGRECTYPE_PSEUDOFIXEDLENGTH, 19, 19, NULL, NULL, NULL, 2}, - /*LOGREC_UNDO_ROW_UPDATE= 15 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 14, NULL, NULL, NULL, 2}, - /*LOGREC_UNDO_KEY_INSERT= 16 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 10, NULL, NULL, NULL, 1}, - /*LOGREC_UNDO_KEY_DELETE= 17 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 15, NULL, NULL, NULL, 2}, - /*LOGREC_PREPARE= 18 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_PREPARE_WITH_UNDO_PURGE= 19 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 5, NULL, NULL, NULL, 1}, - /*LOGREC_COMMIT= 20 */ - {LOGRECTYPE_FIXEDLENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_COMMIT_WITH_UNDO_PURGE= 21 */ - {LOGRECTYPE_PSEUDOFIXEDLENGTH, 5, 5, NULL, NULL, NULL, 1}, - /*LOGREC_CHECKPOINT_PAGE= 22 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 6, NULL, NULL, NULL, 0}, - /*LOGREC_CHECKPOINT_TRAN= 23 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_CHECKPOINT_TABL= 24 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 8, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_CREATE_TABLE= 25 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_RENAME_TABLE= 26 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_DROP_TABLE= 27 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_REDO_TRUNCATE_TABLE= 28 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_FILE_ID= 29 */ - {LOGRECTYPE_VARIABLE_LENGTH, 0, 4, NULL, NULL, NULL, 0}, - /*LOGREC_LONG_TRANSACTION_ID= 30 */ - {LOGRECTYPE_FIXEDLENGTH, 6, 6, NULL, NULL, NULL, 0}, - /*31 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*32 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*33 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*34 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*35 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*36 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*37 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*38 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*39 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*40 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*41 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*42 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*43 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*44 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*45 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*46 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*47 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*48 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*49 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*50 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*51 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*52 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*53 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*54 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*55 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*56 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*57 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*58 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*59 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*60 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*61 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*62 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0}, - /*LOGREC_RESERVED_FUTURE_EXTENSION= 63 */ - {LOGRECTYPE_NOT_ALLOWED, 0, 0, NULL, NULL, NULL, 0} + log_record_type_descriptor[LOGREC_RESERVED_FOR_CHUNKS23]= + INIT_LOGREC_RESERVED_FOR_CHUNKS23; + log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_HEAD]= + INIT_LOGREC_REDO_INSERT_ROW_HEAD; + log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_TAIL]= + INIT_LOGREC_REDO_INSERT_ROW_TAIL; + log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_BLOB]= + INIT_LOGREC_REDO_INSERT_ROW_BLOB; + log_record_type_descriptor[LOGREC_REDO_INSERT_ROW_BLOBS]= + INIT_LOGREC_REDO_INSERT_ROW_BLOBS; + log_record_type_descriptor[LOGREC_REDO_PURGE_ROW_HEAD]= + INIT_LOGREC_REDO_PURGE_ROW_HEAD; + log_record_type_descriptor[LOGREC_REDO_PURGE_ROW_TAIL]= + INIT_LOGREC_REDO_PURGE_ROW_TAIL; + log_record_type_descriptor[LOGREC_REDO_PURGE_BLOCKS]= + INIT_LOGREC_REDO_PURGE_BLOCKS; + log_record_type_descriptor[LOGREC_REDO_DELETE_ROW]= + INIT_LOGREC_REDO_DELETE_ROW; + log_record_type_descriptor[LOGREC_REDO_UPDATE_ROW_HEAD]= + INIT_LOGREC_REDO_UPDATE_ROW_HEAD; + log_record_type_descriptor[LOGREC_REDO_INDEX]= + INIT_LOGREC_REDO_INDEX; + log_record_type_descriptor[LOGREC_REDO_UNDELETE_ROW]= + INIT_LOGREC_REDO_UNDELETE_ROW; + log_record_type_descriptor[LOGREC_CLR_END]= + INIT_LOGREC_CLR_END; + log_record_type_descriptor[LOGREC_PURGE_END]= + INIT_LOGREC_PURGE_END; + log_record_type_descriptor[LOGREC_UNDO_ROW_INSERT]= + INIT_LOGREC_UNDO_ROW_INSERT; + log_record_type_descriptor[LOGREC_UNDO_ROW_DELETE]= + INIT_LOGREC_UNDO_ROW_DELETE; + log_record_type_descriptor[LOGREC_UNDO_ROW_UPDATE]= + INIT_LOGREC_UNDO_ROW_UPDATE; + log_record_type_descriptor[LOGREC_UNDO_ROW_PURGE]= + INIT_LOGREC_UNDO_ROW_PURGE; + log_record_type_descriptor[LOGREC_UNDO_KEY_INSERT]= + INIT_LOGREC_UNDO_KEY_INSERT; + log_record_type_descriptor[LOGREC_UNDO_KEY_DELETE]= + INIT_LOGREC_UNDO_KEY_DELETE; + log_record_type_descriptor[LOGREC_PREPARE]= + INIT_LOGREC_PREPARE; + log_record_type_descriptor[LOGREC_PREPARE_WITH_UNDO_PURGE]= + INIT_LOGREC_PREPARE_WITH_UNDO_PURGE; + log_record_type_descriptor[LOGREC_COMMIT]= + INIT_LOGREC_COMMIT; + log_record_type_descriptor[LOGREC_COMMIT_WITH_UNDO_PURGE]= + INIT_LOGREC_COMMIT_WITH_UNDO_PURGE; + log_record_type_descriptor[LOGREC_CHECKPOINT_PAGE]= + INIT_LOGREC_CHECKPOINT_PAGE; + log_record_type_descriptor[LOGREC_CHECKPOINT_TRAN]= + INIT_LOGREC_CHECKPOINT_TRAN; + log_record_type_descriptor[LOGREC_CHECKPOINT_TABL]= + INIT_LOGREC_CHECKPOINT_TABL; + log_record_type_descriptor[LOGREC_REDO_CREATE_TABLE]= + INIT_LOGREC_REDO_CREATE_TABLE; + log_record_type_descriptor[LOGREC_REDO_RENAME_TABLE]= + INIT_LOGREC_REDO_RENAME_TABLE; + log_record_type_descriptor[LOGREC_REDO_DROP_TABLE]= + INIT_LOGREC_REDO_DROP_TABLE; + log_record_type_descriptor[LOGREC_REDO_TRUNCATE_TABLE]= + INIT_LOGREC_REDO_TRUNCATE_TABLE; + log_record_type_descriptor[LOGREC_FILE_ID]= + INIT_LOGREC_FILE_ID; + log_record_type_descriptor[LOGREC_LONG_TRANSACTION_ID]= + INIT_LOGREC_LONG_TRANSACTION_ID; }; + /* all possible flags page overheads */ static uint page_overhead[TRANSLOG_FLAGS_NUM]; @@ -1812,6 +1877,7 @@ my_bool translog_init(const char *directory, TRANSLOG_ADDRESS sure_page, last_page, last_valid_page; DBUG_ENTER("translog_init"); + loghandler_init(); if (pthread_mutex_init(&log_descriptor.sent_to_file_lock, MY_MUTEX_INIT_FAST)) @@ -2366,7 +2432,7 @@ static my_bool translog_write_parts_on_page(TRANSLOG_ADDRESS *horizon, DBUG_PRINT("enter", ("Chunk length: %lu parts: %u of %u. Page size: %u " "Buffer size: %lu (%lu)", (ulong) length, - (uint) (cur + 1), (uint) parts->parts.elements, + (uint) (cur + 1), (uint) parts->elements, (uint) cursor->current_page_fill, (ulong) cursor->buffer->size, (ulong) (cursor->ptr - cursor->buffer->buffer))); @@ -2378,35 +2444,39 @@ static my_bool translog_write_parts_on_page(TRANSLOG_ADDRESS *horizon, do { translog_size_t len; - struct st_translog_part *part; + LEX_STRING *part; byte *buff; - DBUG_ASSERT(cur < parts->parts.elements); - part= dynamic_element(&parts->parts, cur, struct st_translog_part *); - buff= part->buff; - DBUG_PRINT("info", ("Part: %u Length: %lu left: %lu", - (uint) (cur + 1), (ulong) part->len, (ulong) left)); + DBUG_ASSERT(cur < parts->elements); + part= parts->parts + cur; + buff= (byte*) part->str; + DBUG_PRINT("info", ("Part: %u Length: %lu left: %lu buff: 0x%lx", + (uint) (cur + 1), (ulong) part->length, (ulong) left, + (ulong) buff)); - if (part->len > left) + if (part->length > left) { /* we should write less then the current part */ len= left; - part->len-= len; - part->buff+= len; + part->length-= len; + part->str+= len; DBUG_PRINT("info", ("Set new part: %u Length: %lu", - (uint) (cur + 1), (ulong) part->len)); + (uint) (cur + 1), (ulong) part->length)); } else { - len= part->len; + len= part->length; cur++; DBUG_PRINT("info", ("moved to next part (len: %lu)", (ulong) len)); } DBUG_PRINT("info", ("copy: 0x%lx <- 0x%lx %u", (ulong) cursor->ptr, (ulong)buff, (uint)len)); - memcpy(cursor->ptr, buff, len); - left-= len; - cursor->ptr+= len; + if (likely(len)) + { + memcpy(cursor->ptr, buff, len); + left-= len; + cursor->ptr+= len; + } } while (left); DBUG_PRINT("info", ("Horizon: (%lu,0x%lx) Length %lu(0x%lx)", @@ -2453,10 +2523,11 @@ translog_write_variable_record_1group_header(struct st_translog_parts *parts, uint16 header_length, byte *chunk0_header) { - struct st_translog_part part; + LEX_STRING *part; DBUG_ASSERT(parts->current != 0); /* first part is left for header */ - parts->total_record_length+= (part.len= header_length); - part.buff= chunk0_header; + part= parts->parts + (--parts->current); + parts->total_record_length+= (part->length= header_length); + part->str= (char*)chunk0_header; /* puts chunk type */ *chunk0_header= (byte) (type | TRANSLOG_CHUNK_LSN); int2store(chunk0_header + 1, short_trid); @@ -2466,8 +2537,6 @@ translog_write_variable_record_1group_header(struct st_translog_parts *parts, header_length); /* puts 0 as chunk length which indicate 1 group record */ int2store(chunk0_header + header_length - 2, 0); - parts->current--; - set_dynamic(&parts->parts, (gptr) &part, parts->current); } @@ -2582,7 +2651,7 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts, struct st_buffer_cursor *cursor) { struct st_translog_buffer *buffer_to_flush; - struct st_translog_part part; + LEX_STRING *part; int rc; byte chunk3_header[1 + 2]; DBUG_ENTER("translog_write_variable_record_chunk3_page"); @@ -2606,14 +2675,13 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts, } DBUG_ASSERT(parts->current != 0); /* first part is left for header */ - parts->total_record_length+= (part.len= 1 + 2); - part.buff= chunk3_header; + part= parts->parts + (--parts->current); + parts->total_record_length+= (part->length= 1 + 2); + part->str= (char*)chunk3_header; /* Puts chunk type */ *chunk3_header= (byte) (TRANSLOG_CHUNK_LNGTH); /* Puts chunk length */ int2store(chunk3_header + 1, length); - parts->current--; - set_dynamic(&parts->parts, (gptr) &part, parts->current); translog_write_parts_on_page(horizon, cursor, length + 1 + 2, parts); DBUG_RETURN(0); @@ -3243,61 +3311,63 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts, LSN base_lsn, uint lsns, byte *compressed_LSNs) { - struct st_translog_part *part; + LEX_STRING *part; uint lsns_len= lsns * LSN_STORE_SIZE; DBUG_ENTER("translog_relative_LSN_encode"); - part= dynamic_element(&parts->parts, parts->current, - struct st_translog_part *); + part= parts->parts + parts->current; /* collect all LSN(s) in one chunk if it (they) is (are) divided */ - if (part->len < lsns_len) + if (part->length < lsns_len) { - uint copied= part->len; + uint copied= part->length; + LEX_STRING *next_part; DBUG_PRINT("info", ("Using buffer: 0x%lx", (ulong) compressed_LSNs)); - memcpy(compressed_LSNs, part->buff, part->len); + memcpy(compressed_LSNs, (byte*)part->str, part->length); + next_part= parts->parts + parts->current + 1; do { - struct st_translog_part *next_part; - next_part= dynamic_element(&parts->parts, parts->current + 1, - struct st_translog_part *); - if ((next_part->len + copied) < lsns_len) + DBUG_ASSERT(next_part < parts->parts + parts->elements); + if ((next_part->length + copied) < lsns_len) { - memcpy(compressed_LSNs + copied, next_part->buff, next_part->len); - copied+= next_part->len; - delete_dynamic_element(&parts->parts, parts->current + 1); + memcpy(compressed_LSNs + copied, (byte*)next_part->str, + next_part->length); + copied+= next_part->length; + next_part->length= 0; next_part->str= 0; + /* delete_dynamic_element(&parts->parts, parts->current + 1); */ + next_part++; } else { uint len= lsns_len - copied; - memcpy(compressed_LSNs + copied, next_part->buff, len); + memcpy(compressed_LSNs + copied, (byte*)next_part->str, len); copied= lsns_len; - next_part->buff+= len; - next_part->len-= len; + next_part->str+= len; + next_part->length-= len; } } while (copied < lsns_len); - part->len= lsns_len; - part->buff= compressed_LSNs; + part->length= lsns_len; + part->str= (char*)compressed_LSNs; } { /* Compress */ LSN ref; uint economy; - byte *ref_ptr= part->buff + lsns_len - LSN_STORE_SIZE; - byte *dst_ptr= part->buff + lsns_len; - for (; ref_ptr >= part->buff ; ref_ptr-= LSN_STORE_SIZE) + byte *ref_ptr= (byte*)part->str + lsns_len - LSN_STORE_SIZE; + byte *dst_ptr= (byte*)part->str + lsns_len; + for (; ref_ptr >= (byte*)part->str ; ref_ptr-= LSN_STORE_SIZE) { ref= lsn_korr(ref_ptr); if ((dst_ptr= translog_put_LSN_diff(base_lsn, ref, dst_ptr)) == NULL) DBUG_RETURN(1); } /* Note that dst_ptr did grow downward ! */ - economy= (uint) (dst_ptr - part->buff); + economy= (uint) (dst_ptr - (byte*)part->str); DBUG_PRINT("info", ("Economy: %u", economy)); - part->len-= economy; + part->length-= economy; parts->record_length-= economy; parts->total_record_length-= economy; - part->buff= dst_ptr; + part->str= (char*)dst_ptr; } DBUG_RETURN(0); } @@ -3879,7 +3949,7 @@ static my_bool translog_write_fixed_record(LSN *lsn, byte chunk1_header[1 + 2]; /* Max number of such LSNs per record is 2 */ byte compressed_LSNs[2 * LSN_STORE_SIZE]; - struct st_translog_part part; + LEX_STRING *part; int rc; DBUG_ENTER("translog_write_fixed_record"); DBUG_ASSERT((log_record_type_descriptor[type].class == @@ -3949,12 +4019,11 @@ static my_bool translog_write_fixed_record(LSN *lsn, the destination page) */ DBUG_ASSERT(parts->current != 0); /* first part is left for header */ - parts->total_record_length+= (part.len= 1 + 2); - part.buff= chunk1_header; + part= parts->parts + (--parts->current); + parts->total_record_length+= (part->length= 1 + 2); + part->str= (char*)chunk1_header; *chunk1_header= (byte) (type | TRANSLOG_CHUNK_FIXED); int2store(chunk1_header + 1, short_trid); - parts->current--; - set_dynamic(&parts->parts, (gptr) &part, parts->current); rc= translog_write_parts_on_page(&log_descriptor.horizon, &log_descriptor.bc, @@ -3990,9 +4059,12 @@ err: short_trid Sort transaction ID or 0 if it has no sense tcb Transaction control block pointer for hooks by record log type - partN_length length of Ns part of the log - partN_buffer pointer on Ns part buffer - 0 sign of the end of parts + rec_len record length or 0 (count it) + part_no number of parts or 0 (count it) + parts_data zero ended (in case of number of parts is 0) + array of LEX_STRINGs (parts), first + TRANSLOG_INTERNAL_PARTS positions in the log + should be unused (need for loghandler) RETURN 0 OK @@ -4002,73 +4074,67 @@ err: my_bool translog_write_record(LSN *lsn, enum translog_record_type type, SHORT_TRANSACTION_ID short_trid, - void *tcb, - translog_size_t part1_length, - byte *part1_buff, ...) + void *tcb, struct st_maria_share *share, + translog_size_t rec_len, + uint part_no, + LEX_STRING *parts_data) { struct st_translog_parts parts; - struct st_translog_part part; - va_list pvar; + LEX_STRING *part; int rc; DBUG_ENTER("translog_write_record"); DBUG_PRINT("enter", ("type: %u ShortTrID: %u", (uint) type, (uint)short_trid)); - /* move information about parts into dynamic array */ - if (init_dynamic_array(&parts.parts, sizeof(struct st_translog_part), - 10, 10 CALLER_INFO)) + if (share && !share->base.transactional) { - UNRECOVERABLE_ERROR(("init array failed")); - DBUG_RETURN(1); + DBUG_PRINT("info", ("It is not transactional table")); + DBUG_RETURN(0); } - /* reserve place for header */ - parts.current= 1; - part.len= 0; - part.buff= 0; - if (insert_dynamic(&parts.parts, (gptr) &part)) - { - UNRECOVERABLE_ERROR(("insert into array failed")); - DBUG_RETURN(1); - } + parts.parts= parts_data; - parts.record_length= part.len= part1_length; - part.buff= part1_buff; - if (insert_dynamic(&parts.parts, (gptr) &part)) + /* count parts if they are not counted by upper level */ + if (part_no == 0) { - UNRECOVERABLE_ERROR(("insert into array failed")); - DBUG_RETURN(1); + for (part_no= TRANSLOG_INTERNAL_PARTS; + parts_data[part_no].length != 0; + part_no++); } - DBUG_PRINT("info", ("record length: %lu %lu ...", - (ulong) parts.record_length, - (ulong) parts.total_record_length)); + parts.elements= part_no; + parts.current= TRANSLOG_INTERNAL_PARTS; - /* count record length */ - va_start(pvar, part1_buff); - for (;;) + /* clear TRANSLOG_INTERNAL_PARTS */ + DBUG_ASSERT(TRANSLOG_INTERNAL_PARTS == 1); + parts_data[0].str= 0; + parts_data[0].length= 0; + + /* count length of the record */ + if (rec_len == 0) { - part.len= va_arg(pvar, translog_size_t); - if (part.len == 0) - break; - parts.record_length+= part.len; - part.buff= va_arg(pvar, byte*); - if (insert_dynamic(&parts.parts, (gptr) &part)) + for(part= parts_data + TRANSLOG_INTERNAL_PARTS;\ + part < parts_data + part_no; + part++) { - UNRECOVERABLE_ERROR(("insert into array failed")); - DBUG_RETURN(1); + rec_len+= part->length; } - DBUG_PRINT("info", ("record length: %lu %lu ...", - (ulong) parts.record_length, - (ulong) parts.total_record_length)); } - va_end(pvar); + parts.record_length= rec_len; +#ifndef DBUG_OFF + { + uint i; + uint len= 0; + for (i= TRANSLOG_INTERNAL_PARTS; i < part_no; i++) + len+= parts_data[i].length; + DBUG_ASSERT(len == rec_len); + } +#endif /* Start total_record_length from record_length then overhead will be add */ parts.total_record_length= parts.record_length; - va_end(pvar); DBUG_PRINT("info", ("record length: %lu %lu", (ulong) parts.record_length, (ulong) parts.total_record_length)); @@ -4076,6 +4142,7 @@ my_bool translog_write_record(LSN *lsn, /* process this parts */ if (!(rc= (log_record_type_descriptor[type].prewrite_hook && (*log_record_type_descriptor[type].prewrite_hook) (type, tcb, + share, &parts)))) { switch (log_record_type_descriptor[type].class) { @@ -4093,7 +4160,6 @@ my_bool translog_write_record(LSN *lsn, } } - delete_dynamic(&parts.parts); DBUG_RETURN(rc); } @@ -4968,7 +5034,7 @@ static my_bool translog_init_reader_data(LSN lsn, SYNOPSIS translog_read_record_header() lsn log record serial number (address of the record) - offset From the beginning of the record beginning (read§ + offset From the beginning of the record beginning (read§ by translog_read_record_header). length Length of record part which have to be read. buffer Buffer where to read the record part (have to be at diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h index 48101814063..3ccb3bf9af2 100644 --- a/storage/maria/ma_loghandler.h +++ b/storage/maria/ma_loghandler.h @@ -1,3 +1,9 @@ +/* transaction log default cache size (TODO: make it global variable) */ +#define TRANSLOG_PAGECACHE_SIZE 1024*1024*2 +/* transaction log default file size (TODO: make it global variable) */ +#define TRANSLOG_FILE_SIZE 1024*1024*1024 +/* transaction log default flags (TODO: make it global variable) */ +#define TRANSLOG_DEFAULT_FLAGS 0 /* Transaction log flags */ #define TRANSLOG_PAGE_CRC 1 @@ -18,48 +24,77 @@ /* short transaction ID type */ typedef uint16 SHORT_TRANSACTION_ID; +struct st_maria_share; + /* Length of CRC at end of pages */ #define CRC_LENGTH 4 +/* Size of file id in logs */ +#define FILEID_STORE_SIZE 2 +/* Size of page reference in log */ +#define PAGE_STORE_SIZE ROW_EXTENT_PAGE_SIZE +/* Size of page ranges in log */ +#define PAGERANGE_STORE_SIZE ROW_EXTENT_COUNT_SIZE +#define DIRPOS_STORE_SIZE 1 + +/* Store methods to match the above sizes */ +#define fileid_store(T,A) int2store(T,A) +#define page_store(T,A) int5store(T,A) +#define dirpos_store(T,A) ((*(uchar*) (T)) = A) +#define pagerange_store(T,A) int2store(T,A) + /* Length of disk drive sector size (we assume that writing it to disk is atomic operation) */ #define DISK_DRIVE_SECTOR_SIZE 512 +/* + Number of empty entries we need to have in LEX_STRING for + translog_write_record() +*/ +#define LOG_INTERNAL_PARTS 1 + +/* position reserved in an array of parts of a log record */ +#define TRANSLOG_INTERNAL_PARTS 1 + /* types of records in the transaction log */ +/* Todo: Set numbers for these when we have all entries figured out */ + enum translog_record_type { LOGREC_RESERVED_FOR_CHUNKS23= 0, - LOGREC_REDO_INSERT_ROW_HEAD= 1, - LOGREC_REDO_INSERT_ROW_TAIL= 2, - LOGREC_REDO_INSERT_ROW_BLOB= 3, - LOGREC_REDO_INSERT_ROW_BLOBS= 4, - LOGREC_REDO_PURGE_ROW= 5, - eLOGREC_REDO_PURGE_BLOCKS= 6, - LOGREC_REDO_DELETE_ROW= 7, - LOGREC_REDO_UPDATE_ROW_HEAD= 8, - LOGREC_REDO_INDEX= 9, - LOGREC_REDO_UNDELETE_ROW= 10, - LOGREC_CLR_END= 11, - LOGREC_PURGE_END= 12, - LOGREC_UNDO_ROW_INSERT= 13, - LOGREC_UNDO_ROW_DELETE= 14, - LOGREC_UNDO_ROW_UPDATE= 15, - LOGREC_UNDO_KEY_INSERT= 16, - LOGREC_UNDO_KEY_DELETE= 17, - LOGREC_PREPARE= 18, - LOGREC_PREPARE_WITH_UNDO_PURGE= 19, - LOGREC_COMMIT= 20, - LOGREC_COMMIT_WITH_UNDO_PURGE= 21, - LOGREC_CHECKPOINT_PAGE= 22, - LOGREC_CHECKPOINT_TRAN= 23, - LOGREC_CHECKPOINT_TABL= 24, - LOGREC_REDO_CREATE_TABLE= 25, - LOGREC_REDO_RENAME_TABLE= 26, - LOGREC_REDO_DROP_TABLE= 27, - LOGREC_REDO_TRUNCATE_TABLE= 28, - LOGREC_FILE_ID= 29, - LOGREC_LONG_TRANSACTION_ID= 30, + LOGREC_REDO_INSERT_ROW_HEAD, + LOGREC_REDO_INSERT_ROW_TAIL, + LOGREC_REDO_INSERT_ROW_BLOB, + LOGREC_REDO_INSERT_ROW_BLOBS, + LOGREC_REDO_PURGE_ROW_HEAD, + LOGREC_REDO_PURGE_ROW_TAIL, + LOGREC_REDO_PURGE_BLOCKS, + LOGREC_REDO_DELETE_ROW, + LOGREC_REDO_UPDATE_ROW_HEAD, + LOGREC_REDO_INDEX, + LOGREC_REDO_UNDELETE_ROW, + LOGREC_CLR_END, + LOGREC_PURGE_END, + LOGREC_UNDO_ROW_INSERT, + LOGREC_UNDO_ROW_DELETE, + LOGREC_UNDO_ROW_UPDATE, + LOGREC_UNDO_ROW_PURGE, + LOGREC_UNDO_KEY_INSERT, + LOGREC_UNDO_KEY_DELETE, + LOGREC_PREPARE, + LOGREC_PREPARE_WITH_UNDO_PURGE, + LOGREC_COMMIT, + LOGREC_COMMIT_WITH_UNDO_PURGE, + LOGREC_CHECKPOINT_PAGE, + LOGREC_CHECKPOINT_TRAN, + LOGREC_CHECKPOINT_TABL, + LOGREC_REDO_CREATE_TABLE, + LOGREC_REDO_RENAME_TABLE, + LOGREC_REDO_DROP_TABLE, + LOGREC_REDO_TRUNCATE_TABLE, + LOGREC_FILE_ID, + LOGREC_LONG_TRANSACTION_ID, LOGREC_RESERVED_FUTURE_EXTENSION= 63 }; #define LOGREC_NUMBER_OF_TYPES 64 /* Maximum, can't be extended */ @@ -145,17 +180,22 @@ struct st_translog_reader_data my_bool eor; /* end of the record */ }; +#ifdef __cplusplus +extern "C" { +#endif +extern void loghandler_init(); extern my_bool translog_init(const char *directory, uint32 log_file_max_size, uint32 server_version, uint32 server_id, PAGECACHE *pagecache, uint flags); extern my_bool translog_write_record(LSN *lsn, - enum translog_record_type type, - SHORT_TRANSACTION_ID short_trid, - void *tcb, - translog_size_t part1_length, - byte *part1_buff, ...); + enum translog_record_type type, + SHORT_TRANSACTION_ID short_trid, + void *tcb, struct st_maria_share *share, + translog_size_t rec_len, + uint part_no, + LEX_STRING *parts_data); extern void translog_destroy(); @@ -182,4 +222,7 @@ extern translog_size_t translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner, TRANSLOG_HEADER_BUFFER *buff); +#ifdef __cplusplus +} +#endif diff --git a/storage/maria/ma_open.c b/storage/maria/ma_open.c index 9b7313a469b..b3005571436 100644 --- a/storage/maria/ma_open.c +++ b/storage/maria/ma_open.c @@ -19,6 +19,7 @@ #include "ma_sp_defs.h" #include "ma_rt_index.h" #include "ma_blockrec.h" +#include "trnman.h" #include <m_ctype.h> #if defined(MSDOS) || defined(__WIN__) @@ -431,6 +432,9 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE; share->base.default_rec_buff_size= max(share->base.pack_reclength, share->base.max_key_length); + share->page_type= (share->base.transactional ? PAGECACHE_LSN_PAGE : + PAGECACHE_PLAIN_PAGE); + if (share->data_file_type == DYNAMIC_RECORD) { share->base.extra_rec_buff_size= @@ -634,6 +638,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) share->delay_key_write=1; info.state= &share->state.state; /* Change global values by default */ + info.trn= &dummy_transaction_object; pthread_mutex_unlock(&share->intern_lock); /* Allocate buffer for one record */ diff --git a/storage/maria/ma_pagecache.c b/storage/maria/ma_pagecache.c index b7f30eae625..f0c1d674f4b 100755 --- a/storage/maria/ma_pagecache.c +++ b/storage/maria/ma_pagecache.c @@ -95,7 +95,7 @@ #define PCBLOCK_INFO(B) \ DBUG_PRINT("info", \ - ("block 0x%lx file %lu page %lu s %0x hshL 0x%lx req %u/%u " \ + ("block: 0x%lx file: %lu page: %lu s: %0x hshL: 0x%lx req: %u/%u " \ "wrlock: %c", \ (ulong)(B), \ (ulong)((B)->hash_link ? \ @@ -124,6 +124,9 @@ my_bool my_disable_flush_pagecache_blocks= 0; #define COND_FOR_WRLOCK 2 /* queue of write lock */ #define COND_SIZE 3 /* number of COND_* queues */ +/* offset of LSN on the page */ +#define PAGE_LSN_OFFSET 0 + typedef pthread_cond_t KEYCACHE_CONDVAR; /* descriptor of the page in the page cache block buffer */ @@ -164,34 +167,34 @@ enum PCBLOCK_TEMPERATURE { PCBLOCK_COLD /*free*/ , PCBLOCK_WARM , PCBLOCK_HOT }; /* debug info */ #ifndef DBUG_OFF -static char *page_cache_page_type_str[]= +static const char *page_cache_page_type_str[]= { - (char*)"PLAIN", - (char*)"LSN" + "PLAIN", + "LSN" }; -static char *page_cache_page_write_mode_str[]= +static const char *page_cache_page_write_mode_str[]= { - (char*)"DELAY", - (char*)"NOW", - (char*)"DONE" + "DELAY", + "NOW", + "DONE" }; -static char *page_cache_page_lock_str[]= +static const char *page_cache_page_lock_str[]= { - (char*)"free -> free ", - (char*)"read -> read ", - (char*)"write -> write", - (char*)"free -> read ", - (char*)"free -> write", - (char*)"read -> free ", - (char*)"write -> free ", - (char*)"write -> read " + "free -> free", + "read -> read", + "write -> write", + "free -> read", + "free -> write", + "read -> free", + "write -> free", + "write -> read" }; -static char *page_cache_page_pin_str[]= +static const char *page_cache_page_pin_str[]= { - (char*)"pinned -> pinned ", - (char*)"unpinned -> unpinned", - (char*)"unpinned -> pinned ", - (char*)"pinned -> unpinned" + "pinned -> pinned", + "unpinned -> unpinned", + "unpinned -> pinned", + "pinned -> unpinned" }; #endif #ifndef DBUG_OFF @@ -309,22 +312,21 @@ static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block, struct st_my_thread_var *thread= my_thread_var; PAGECACHE_PIN_INFO *info= info_find(block->pin_list, thread); DBUG_ENTER("info_check_pin"); - DBUG_PRINT("enter", ("info_check_pin: thread: 0x%lx pin: %s", - (ulong)thread, - page_cache_page_pin_str[mode])); + DBUG_PRINT("enter", ("thread: 0x%lx pin: %s", + (ulong) thread, page_cache_page_pin_str[mode])); if (info) { if (mode == PAGECACHE_PIN_LEFT_UNPINNED) { DBUG_PRINT("info", - ("info_check_pin: thread: 0x%lx block 0x%lx: LEFT_UNPINNED!!!", + ("info_check_pin: thread: 0x%lx block: 0x%lx ; LEFT_UNPINNED!!!", (ulong)thread, (ulong)block)); DBUG_RETURN(1); } else if (mode == PAGECACHE_PIN) { DBUG_PRINT("info", - ("info_check_pin: thread: 0x%lx block 0x%lx: PIN!!!", + ("info_check_pin: thread: 0x%lx block: 0x%lx ; PIN!!!", (ulong)thread, (ulong)block)); DBUG_RETURN(1); } @@ -334,14 +336,14 @@ static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block, if (mode == PAGECACHE_PIN_LEFT_PINNED) { DBUG_PRINT("info", - ("info_check_pin: thread: 0x%lx block 0x%lx: LEFT_PINNED!!!", + ("info_check_pin: thread: 0x%lx block: 0x%lx ; LEFT_PINNED!!!", (ulong)thread, (ulong)block)); DBUG_RETURN(1); } else if (mode == PAGECACHE_UNPIN) { DBUG_PRINT("info", - ("info_check_pin: thread: 0x%lx block 0x%lx: UNPIN!!!", + ("info_check_pin: thread: 0x%lx block: 0x%lx ; UNPIN!!!", (ulong)thread, (ulong)block)); DBUG_RETURN(1); } @@ -571,7 +573,6 @@ static uint pagecache_fwrite(PAGECACHE *pagecache, LSN lsn; DBUG_PRINT("info", ("Log handler call")); /* TODO: integrate with page format */ -#define PAGE_LSN_OFFSET 0 lsn= lsn_korr(buffer + PAGE_LSN_OFFSET); /* check CONTROL_FILE_IMPOSSIBLE_FILENO & @@ -1193,7 +1194,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, */ if ((PAGECACHE_HASH_LINK *) thread->opt_info == hash_link) { - KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id)); + KEYCACHE_DBUG_PRINT("link_block: signal", ("thread: %ld", thread->id)); pagecache_pthread_cond_signal(&thread->suspend); wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread); block->requests++; @@ -1204,7 +1205,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, KEYCACHE_THREAD_TRACE("link_block: after signaling"); #if defined(PAGECACHE_DEBUG) KEYCACHE_DBUG_PRINT("link_block", - ("linked,unlinked block %u status=%x #requests=%u #available=%u", + ("linked,unlinked block: %u status: %x #requests: %u #available: %u", PCBLOCK_NUMBER(pagecache, block), block->status, block->requests, pagecache->blocks_available)); #endif @@ -1235,9 +1236,9 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, #if defined(PAGECACHE_DEBUG) pagecache->blocks_available++; KEYCACHE_DBUG_PRINT("link_block", - ("linked block %u:%1u status=%x #requests=%u #available=%u", - PCBLOCK_NUMBER(pagecache, block), at_end, block->status, - block->requests, pagecache->blocks_available)); + ("linked block: %u:%1u status: %x #requests: %u #available: %u", + PCBLOCK_NUMBER(pagecache, block), at_end, block->status, + block->requests, pagecache->blocks_available)); KEYCACHE_DBUG_ASSERT((ulong) pagecache->blocks_available <= pagecache->blocks_used); #endif @@ -1284,9 +1285,10 @@ static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) KEYCACHE_DBUG_ASSERT(pagecache->blocks_available != 0); pagecache->blocks_available--; KEYCACHE_DBUG_PRINT("unlink_block", - ("unlinked block 0x%lx (%u) status=%x #requests=%u #available=%u", - (ulong)block, PCBLOCK_NUMBER(pagecache, block), block->status, - block->requests, pagecache->blocks_available)); + ("unlinked block: 0x%lx (%u) status: %x #requests: %u #available: %u", + (ulong)block, PCBLOCK_NUMBER(pagecache, block), + block->status, + block->requests, pagecache->blocks_available)); PCBLOCK_INFO(block); #endif DBUG_VOID_RETURN; @@ -1310,7 +1312,7 @@ static void reg_requests(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, int count) { DBUG_ENTER("reg_requests"); - DBUG_PRINT("enter", ("block 0x%lx (%u) status=%x, reqs: %u", + DBUG_PRINT("enter", ("block: 0x%lx (%u) status: %x reqs: %u", (ulong)block, PCBLOCK_NUMBER(pagecache, block), block->status, block->requests)); PCBLOCK_INFO(block); @@ -1355,7 +1357,7 @@ static void unreg_request(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, int at_end) { DBUG_ENTER("unreg_request"); - DBUG_PRINT("enter", ("block 0x%lx (%u) status=%x, reqs: %u", + DBUG_PRINT("enter", ("block 0x%lx (%u) status: %x reqs: %u", (ulong)block, PCBLOCK_NUMBER(pagecache, block), block->status, block->requests)); PCBLOCK_INFO(block); @@ -1431,7 +1433,7 @@ static inline void wait_for_readers(PAGECACHE *pagecache while (block->hash_link->requests) { KEYCACHE_DBUG_PRINT("wait_for_readers: wait", - ("suspend thread %ld block %u", + ("suspend thread: %ld block: %u", thread->id, PCBLOCK_NUMBER(pagecache, block))); block->condvar= &thread->suspend; pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); @@ -1790,7 +1792,7 @@ restart: /* This is a request for a page to be removed from cache */ KEYCACHE_DBUG_PRINT("find_block", - ("request for old page in block %u " + ("request for old page in block: %u " "wrmode: %d block->status: %d", PCBLOCK_NUMBER(pagecache, block), wrmode, block->status)); @@ -2028,11 +2030,11 @@ restart: KEYCACHE_DBUG_ASSERT(page_status != -1); *page_st= page_status; DBUG_PRINT("info", - ("block: 0x%lx fd: %u pos %lu block->status %u page_status %u", + ("block: 0x%lx fd: %u pos: %lu block->status: %u page_status: %u", (ulong) block, (uint) file->file, (ulong) pageno, block->status, (uint) page_status)); KEYCACHE_DBUG_PRINT("find_block", - ("block: 0x%lx fd: %d pos: %lu block->status: %u page_status: %d", + ("block: 0x%lx fd: %d pos: %lu block->status: %u page_status: %d", (ulong) block, file->file, (ulong) pageno, block->status, page_status)); @@ -2049,7 +2051,7 @@ restart: static void add_pin(PAGECACHE_BLOCK_LINK *block) { DBUG_ENTER("add_pin"); - DBUG_PRINT("enter", ("block 0x%lx pins: %u", + DBUG_PRINT("enter", ("block: 0x%lx pins: %u", (ulong) block, block->pins)); PCBLOCK_INFO(block); @@ -2068,7 +2070,7 @@ static void add_pin(PAGECACHE_BLOCK_LINK *block) static void remove_pin(PAGECACHE_BLOCK_LINK *block) { DBUG_ENTER("remove_pin"); - DBUG_PRINT("enter", ("block 0x%lx pins: %u", + DBUG_PRINT("enter", ("block: 0x%lx pins: %u", (ulong) block, block->pins)); PCBLOCK_INFO(block); @@ -2233,7 +2235,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, enum pagecache_page_pin pin) { DBUG_ENTER("make_lock_and_pin"); - DBUG_PRINT("enter", ("block: 0x%lx (%u), wrlock: %c pins: %u, lock %s, pin: %s", + DBUG_PRINT("enter", ("block: 0x%lx (%u) wrlock: %c pins: %u lock: %s pin: %s", (ulong)block, PCBLOCK_NUMBER(pagecache, block), ((block->status & PCBLOCK_WRLOCK)?'Y':'N'), block->pins, @@ -2242,8 +2244,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, PCBLOCK_INFO(block); DBUG_ASSERT(info_check_pin(block, pin) == 0 && info_check_lock(block, lock, pin) == 0); - switch (lock) - { + switch (lock) { case PAGECACHE_LOCK_WRITE: /* free -> write */ /* Writelock and pin the buffer */ if (get_wrlock(pagecache, block)) @@ -2413,6 +2414,30 @@ static void read_block(PAGECACHE *pagecache, /* + Set LSN on the page to the given one if the given LSN is bigger + + SYNOPSIS + check_and_set_lsn() + lsn LSN to set + block block to check and set +*/ + +static void check_and_set_lsn(LSN lsn, PAGECACHE_BLOCK_LINK *block) +{ + LSN old; + DBUG_ENTER("check_and_set_lsn"); + DBUG_ASSERT(block->type == PAGECACHE_LSN_PAGE); + old= lsn_korr(block->buffer + PAGE_LSN_OFFSET); + DBUG_PRINT("info", ("old lsn: (%lu, 0x%lx) new lsn: (%lu, 0x%lx)", + (ulong)LSN_FILE_NO(old), (ulong)LSN_OFFSET(old), + (ulong)LSN_FILE_NO(lsn), (ulong)LSN_OFFSET(lsn))); + if (cmp_translog_addr(lsn, old) > 0) + lsn_store(block->buffer + PAGE_LSN_OFFSET, lsn); + DBUG_VOID_RETURN; +} + + +/* Unlock/unpin page and put LSN stamp if it need SYNOPSIS @@ -2423,6 +2448,9 @@ static void read_block(PAGECACHE *pagecache, lock lock change pin pin page first_REDO_LSN_for_page do not set it if it is zero + lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it + is bigger then LSN on the page it will be written on + the page NOTE Pininig uses requests registration mechanism it works following way: @@ -2442,12 +2470,12 @@ void pagecache_unlock_page(PAGECACHE *pagecache, pgcache_page_no_t pageno, enum pagecache_page_lock lock, enum pagecache_page_pin pin, - LSN first_REDO_LSN_for_page) + LSN first_REDO_LSN_for_page, LSN lsn) { PAGECACHE_BLOCK_LINK *block; int page_st; DBUG_ENTER("pagecache_unlock_page"); - DBUG_PRINT("enter", ("fd: %u page: %lu l%s p%s", + DBUG_PRINT("enter", ("fd: %u page: %lu %s %s", (uint) file->file, (ulong) pageno, page_cache_page_lock_str[lock], page_cache_page_pin_str[pin])); @@ -2475,19 +2503,15 @@ void pagecache_unlock_page(PAGECACHE *pagecache, pin == PAGECACHE_UNPIN); set_if_bigger(block->rec_lsn, first_REDO_LSN_for_page); } + if (lsn != 0) + { + check_and_set_lsn(lsn, block); + } -#ifndef DBUG_OFF - if ( -#endif - make_lock_and_pin(pagecache, block, lock, pin) -#ifndef DBUG_OFF - ) + if (make_lock_and_pin(pagecache, block, lock, pin)) { DBUG_ASSERT(0); /* should not happend */ } -#else - ; -#endif remove_reader(block); /* @@ -2514,11 +2538,15 @@ void pagecache_unlock_page(PAGECACHE *pagecache, pagecache pointer to a page cache data structure file handler for the file for the block of data to be read pageno number of the block of data in the file + lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it + is bigger then LSN on the page it will be written on + the page */ void pagecache_unpin_page(PAGECACHE *pagecache, PAGECACHE_FILE *file, - pgcache_page_no_t pageno) + pgcache_page_no_t pageno, + LSN lsn) { PAGECACHE_BLOCK_LINK *block; int page_st; @@ -2537,25 +2565,20 @@ void pagecache_unpin_page(PAGECACHE *pagecache, block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st); DBUG_ASSERT(block != 0 && page_st == PAGE_READ); -#ifndef DBUG_OFF - if ( -#endif - /* - we can just unpin only with keeping read lock because: - a) we can't pin without any lock - b) we can't unpin keeping write lock - */ - make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_LEFT_READLOCKED, - PAGECACHE_UNPIN) -#ifndef DBUG_OFF - ) + if (lsn != 0) { - DBUG_ASSERT(0); /* should not happend */ + check_and_set_lsn(lsn, block); } -#else - ; -#endif + + /* + we can just unpin only with keeping read lock because: + a) we can't pin without any lock + b) we can't unpin keeping write lock + */ + if (make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_LEFT_READLOCKED, + PAGECACHE_UNPIN)) + DBUG_ASSERT(0); /* should not happend */ remove_reader(block); /* @@ -2584,17 +2607,20 @@ void pagecache_unpin_page(PAGECACHE *pagecache, lock lock change pin pin page first_REDO_LSN_for_page do not set it if it is zero + lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it + is bigger then LSN on the page it will be written on + the page */ void pagecache_unlock(PAGECACHE *pagecache, PAGECACHE_PAGE_LINK *link, enum pagecache_page_lock lock, enum pagecache_page_pin pin, - LSN first_REDO_LSN_for_page) + LSN first_REDO_LSN_for_page, LSN lsn) { PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link; DBUG_ENTER("pagecache_unlock"); - DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu l%s p%s", + DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu %s %s", (ulong) block, (uint) block->hash_link->file.file, (ulong) block->hash_link->pageno, @@ -2611,19 +2637,9 @@ void pagecache_unlock(PAGECACHE *pagecache, if (pin == PAGECACHE_PIN_LEFT_UNPINNED && lock == PAGECACHE_LOCK_READ_UNLOCK) { -#ifndef DBUG_OFF - if ( -#endif - /* block do not need here so we do not provide it */ - make_lock_and_pin(pagecache, 0, lock, pin) -#ifndef DBUG_OFF - ) - { - DBUG_ASSERT(0); /* should not happend */ - } -#else - ; -#endif + /* block do not need here so we do not provide it */ + if (make_lock_and_pin(pagecache, 0, lock, pin)) + DBUG_ASSERT(0); /* should not happend */ DBUG_VOID_RETURN; } @@ -2641,19 +2657,13 @@ void pagecache_unlock(PAGECACHE *pagecache, pin == PAGECACHE_UNPIN); set_if_bigger(block->rec_lsn, first_REDO_LSN_for_page); } - -#ifndef DBUG_OFF - if ( -#endif - make_lock_and_pin(pagecache, block, lock, pin) -#ifndef DBUG_OFF - ) + if (lsn != 0) { - DBUG_ASSERT(0); /* should not happend */ + check_and_set_lsn(lsn, block); } -#else - ; -#endif + + if (make_lock_and_pin(pagecache, block, lock, pin)) + DBUG_ASSERT(0); /* should not happend */ remove_reader(block); /* @@ -2680,10 +2690,14 @@ void pagecache_unlock(PAGECACHE *pagecache, pagecache_unpin_page() pagecache pointer to a page cache data structure link direct link to page (returned by read or write) + lsn if it is not CONTROL_FILE_IMPOSSIBLE_LSN (0) and it + is bigger then LSN on the page it will be written on + the page */ void pagecache_unpin(PAGECACHE *pagecache, - PAGECACHE_PAGE_LINK *link) + PAGECACHE_PAGE_LINK *link, + LSN lsn) { PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link; DBUG_ENTER("pagecache_unpin"); @@ -2701,25 +2715,20 @@ void pagecache_unpin(PAGECACHE *pagecache, inc_counter_for_resize_op(pagecache); -#ifndef DBUG_OFF - if ( -#endif - /* - we can just unpin only with keeping read lock because: - a) we can't pin without any lock - b) we can't unpin keeping write lock - */ - make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_LEFT_READLOCKED, - PAGECACHE_UNPIN) -#ifndef DBUG_OFF - ) + if (lsn != 0) { - DBUG_ASSERT(0); /* should not happend */ + check_and_set_lsn(lsn, block); } -#else - ; -#endif + + /* + We can just unpin only with keeping read lock because: + a) we can't pin without any lock + b) we can't unpin keeping write lock + */ + if (make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_LEFT_READLOCKED, + PAGECACHE_UNPIN)) + DBUG_ASSERT(0); /* should not happend */ remove_reader(block); /* @@ -2785,7 +2794,7 @@ byte *pagecache_valid_read(PAGECACHE *pagecache, enum pagecache_page_pin pin= lock_to_pin[lock]; PAGECACHE_PAGE_LINK fake_link; DBUG_ENTER("pagecache_valid_read"); - DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s l%s p%s", + DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s %s %s", (uint) file->file, (ulong) pageno, level, page_cache_page_type_str[type], page_cache_page_lock_str[lock], @@ -2858,14 +2867,16 @@ restart: #endif } - remove_reader(block); /* Link the block into the LRU chain if it's the last submitted request for the block and block will not be pinned. See NOTE for pagecache_unlock_page about registering requests. */ if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN) + { + remove_reader(block); unreg_request(pagecache, block, 1); + } else *link= (PAGECACHE_PAGE_LINK)block; @@ -2909,6 +2920,7 @@ no_key_cache: /* Key cache is not used */ lock can be only PAGECACHE_LOCK_LEFT_WRITELOCKED (page was write locked before) or PAGECACHE_LOCK_WRITE (delete will write lock page before delete) */ + my_bool pagecache_delete_page(PAGECACHE *pagecache, PAGECACHE_FILE *file, pgcache_page_no_t pageno, @@ -2918,7 +2930,7 @@ my_bool pagecache_delete_page(PAGECACHE *pagecache, int error= 0; enum pagecache_page_pin pin= lock_to_pin[lock]; DBUG_ENTER("pagecache_delete_page"); - DBUG_PRINT("enter", ("fd: %u page: %lu l%s p%s", + DBUG_PRINT("enter", ("fd: %u page: %lu %s %s", (uint) file->file, (ulong) pageno, page_cache_page_lock_str[lock], page_cache_page_pin_str[pin])); @@ -3018,12 +3030,34 @@ end: } +my_bool pagecache_delete_pages(PAGECACHE *pagecache, + PAGECACHE_FILE *file, + pgcache_page_no_t pageno, + uint page_count, + enum pagecache_page_lock lock, + my_bool flush) +{ + ulong page_end; + DBUG_ENTER("pagecache_delete_pages"); + DBUG_ASSERT(page_count > 0); + + page_end= pageno + page_count; + do + { + if (pagecache_delete_page(pagecache, file, pageno, + lock, flush)) + DBUG_RETURN(1); + } while (++pageno != page_end); + DBUG_RETURN(0); +} + + /* Write a buffer into a cached file. SYNOPSIS - pagecache_write() + pagecache_write_part() pagecache pointer to a page cache data structure file handler for the file to write data to pageno number of the block of data in the file @@ -3107,7 +3141,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, int error= 0; int need_lock_change= write_lock_change_table[lock].need_lock_change; DBUG_ENTER("pagecache_write_part"); - DBUG_PRINT("enter", ("fd: %u page: %lu level: %u type: %s lock: %s " + DBUG_PRINT("enter", ("fd: %u page: %lu level: %u type: %s lock: %s " "pin: %s mode: %s offset: %u size %u", (uint) file->file, (ulong) pageno, level, page_cache_page_type_str[type], @@ -3182,7 +3216,6 @@ restart: goto restart; } - if (write_mode == PAGECACHE_WRITE_DONE) { if ((block->status & PCBLOCK_ERROR) && page_st != PAGE_READ) @@ -3220,30 +3253,30 @@ restart: if (need_lock_change) { -#ifndef DBUG_OFF - int rc= -#endif - /* - RECOVERY TODO BUG We are doing an unlock here, so need to give the - page its rec_lsn - */ - make_lock_and_pin(pagecache, block, - write_lock_change_table[lock].unlock_lock, - write_pin_change_table[pin].unlock_pin); -#ifndef DBUG_OFF - DBUG_ASSERT(rc == 0); -#endif + /* + RECOVERY TODO BUG We are doing an unlock here, so need to give the + page its rec_lsn + */ + if (make_lock_and_pin(pagecache, block, + write_lock_change_table[lock].unlock_lock, + write_pin_change_table[pin].unlock_pin)) + DBUG_ASSERT(0); } - /* Unregister the request */ - DBUG_ASSERT(block->hash_link->requests > 0); - block->hash_link->requests--; /* See NOTE for pagecache_unlock_page about registering requests. */ if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN) + { + /* Unregister the request */ + DBUG_ASSERT(block->hash_link->requests > 0); + block->hash_link->requests--; unreg_request(pagecache, block, 1); + } else + { + if (pin == PAGECACHE_PIN_LEFT_PINNED) + block->hash_link->requests--; *link= (PAGECACHE_PAGE_LINK)block; - + } if (block->status & PCBLOCK_ERROR) error= 1; @@ -3289,8 +3322,9 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) { KEYCACHE_THREAD_TRACE("free block"); KEYCACHE_DBUG_PRINT("free_block", - ("block %u to be freed, hash_link %p", - PCBLOCK_NUMBER(pagecache, block), block->hash_link)); + ("block: %u hash_link 0x%lx", + PCBLOCK_NUMBER(pagecache, block), + (long) block->hash_link)); if (block->hash_link) { /* @@ -3376,9 +3410,9 @@ static int flush_cached_blocks(PAGECACHE *pagecache, if (block->pins) { KEYCACHE_DBUG_PRINT("flush_cached_blocks", - ("block %u (0x%lx) pinned", + ("block: %u (0x%lx) pinned", PCBLOCK_NUMBER(pagecache, block), (ulong)block)); - DBUG_PRINT("info", ("block %u (0x%lx) pinned", + DBUG_PRINT("info", ("block: %u (0x%lx) pinned", PCBLOCK_NUMBER(pagecache, block), (ulong)block)); PCBLOCK_INFO(block); last_errno= -1; @@ -3388,25 +3422,18 @@ static int flush_cached_blocks(PAGECACHE *pagecache, /* if the block is not pinned then it is not write locked */ DBUG_ASSERT((block->status & PCBLOCK_WRLOCK) == 0); DBUG_ASSERT(block->pins == 0); -#ifndef DBUG_OFF - { - int rc= -#endif - make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_WRITE, PAGECACHE_PIN); -#ifndef DBUG_OFF - DBUG_ASSERT(rc == 0); - } -#endif + if (make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_WRITE, PAGECACHE_PIN)) + DBUG_ASSERT(0); KEYCACHE_DBUG_PRINT("flush_cached_blocks", - ("block %u (0x%lx) to be flushed", + ("block: %u (0x%lx) to be flushed", PCBLOCK_NUMBER(pagecache, block), (ulong)block)); - DBUG_PRINT("info", ("block %u (0x%lx) to be flushed", + DBUG_PRINT("info", ("block: %u (0x%lx) to be flushed", PCBLOCK_NUMBER(pagecache, block), (ulong)block)); PCBLOCK_INFO(block); pagecache_pthread_mutex_unlock(&pagecache->cache_lock); - DBUG_PRINT("info", ("block %u (0x%lx) pins: %u", + DBUG_PRINT("info", ("block: %u (0x%lx) pins: %u", PCBLOCK_NUMBER(pagecache, block), (ulong)block, block->pins)); DBUG_ASSERT(block->pins == 1); diff --git a/storage/maria/ma_static.c b/storage/maria/ma_static.c index 4a6746b71d8..c77f3f512fd 100644 --- a/storage/maria/ma_static.c +++ b/storage/maria/ma_static.c @@ -20,6 +20,7 @@ #ifndef _global_h #include "maria_def.h" +#include "trnman.h" #endif LIST *maria_open_list=0; @@ -43,6 +44,12 @@ ulong maria_data_pointer_size= 4; PAGECACHE maria_pagecache_var; PAGECACHE *maria_pagecache= &maria_pagecache_var; +PAGECACHE maria_log_pagecache_var; +PAGECACHE *maria_log_pagecache= &maria_log_pagecache_var; + +/* For using maria externally */ +TRN dummy_transaction_object; + /* Enough for comparing if number is zero */ byte maria_zero_string[]= {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}; diff --git a/storage/maria/ma_statrec.c b/storage/maria/ma_statrec.c index 68864e7c170..8ca3a5e989d 100644 --- a/storage/maria/ma_statrec.c +++ b/storage/maria/ma_statrec.c @@ -86,6 +86,7 @@ my_bool _ma_write_static_record(MARIA_HA *info, const byte *record) } my_bool _ma_update_static_record(MARIA_HA *info, MARIA_RECORD_POS pos, + const byte *oldrec __attribute__ ((unused)), const byte *record) { info->rec_cache.seek_not_done=1; /* We have done a seek */ @@ -96,7 +97,8 @@ my_bool _ma_update_static_record(MARIA_HA *info, MARIA_RECORD_POS pos, } -my_bool _ma_delete_static_record(MARIA_HA *info) +my_bool _ma_delete_static_record(MARIA_HA *info, + const byte *record __attribute__ ((unused))) { byte temp[9]; /* 1+sizeof(uint32) */ info->state->del++; diff --git a/storage/maria/ma_test1.c b/storage/maria/ma_test1.c index 1c69e2c95b4..98d55c7d254 100644 --- a/storage/maria/ma_test1.c +++ b/storage/maria/ma_test1.c @@ -33,7 +33,7 @@ static uint insert_count, update_count, remove_count; static uint pack_keys=0, pack_seg=0, key_length; static uint unique_key=HA_NOSAME; static my_bool pagecacheing, null_fields, silent, skip_update, opt_unique, - verbose, skip_delete; + verbose, skip_delete, transactional; static MARIA_COLUMNDEF recinfo[4]; static MARIA_KEYDEF keyinfo[10]; static HA_KEYSEG keyseg[10]; @@ -152,6 +152,7 @@ static int run_test(const char *filename) create_info.max_rows=(ulong) (rec_pointer_size ? (1L << (rec_pointer_size*8))/40 : 0); + create_info.transactional= transactional; if (maria_create(filename, record_type, 1, keyinfo,2+opt_unique,recinfo, uniques, &uniquedef, &create_info, create_flag)) @@ -595,6 +596,9 @@ static struct my_option my_long_options[] = (gptr*) &skip_delete, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"skip-update", 'D', "Don't test updates", (gptr*) &skip_update, (gptr*) &skip_update, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"transactional", 'T', "Test in transactional mode. (Only works with block format)", + (gptr*) &transactional, (gptr*) &transactional, 0, GET_BOOL, NO_ARG, + 0, 0, 0, 0, 0, 0}, {"unique", 'C', "Undocumented", (gptr*) &opt_unique, (gptr*) &opt_unique, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"update-rows", 'u', "Undocumented", (gptr*) &update_count, diff --git a/storage/maria/ma_test2.c b/storage/maria/ma_test2.c index 18eaf27073b..cde8da08dca 100644 --- a/storage/maria/ma_test2.c +++ b/storage/maria/ma_test2.c @@ -46,7 +46,7 @@ static void copy_key(struct st_maria_info *info,uint inx, static int verbose=0,testflag=0, first_key=0,async_io=0,pagecacheing=0,write_cacheing=0,locking=0, rec_pointer_size=0,pack_fields=1,silent=0, - opt_quick_mode=0; + opt_quick_mode=0, transactional= 0; static int pack_seg=HA_SPACE_PACK,pack_type=HA_PACK_KEY,remove_count=-1; static int create_flag= 0, srand_arg= 0; static ulong pagecache_size=IO_SIZE*16; @@ -209,6 +209,7 @@ int main(int argc, char *argv[]) (1L << (rec_pointer_size*8))/ reclength : 0); create_info.reloc_rows=(ha_rows) 100; + create_info.transactional= transactional; if (maria_create(filename, record_type, keys,&keyinfo[first_key], use_blob ? 7 : 6, &recinfo[0], 0,(MARIA_UNIQUEDEF*) 0, @@ -993,6 +994,9 @@ static void get_options(int argc, char **argv) case 't': testflag=atoi(++pos); /* testmod */ break; + case 'T': + transactional= 1; + break; case 'q': opt_quick_mode=1; break; @@ -1007,7 +1011,7 @@ static void get_options(int argc, char **argv) case 'V': printf("%s Ver 1.0 for %s at %s\n",progname,SYSTEM_TYPE,MACHINE_TYPE); puts("By Monty, for your professional use\n"); - printf("Usage: %s [-?AbBcDIKLPRqSsVWltv] [-k#] [-f#] [-m#] [-e#] [-E#] [-t#]\n", + printf("Usage: %s [-?AbBcDIKLPRqSsTVWltv] [-k#] [-f#] [-m#] [-e#] [-E#] [-t#]\n", progname); exit(0); case '#': diff --git a/storage/maria/ma_test_all.sh b/storage/maria/ma_test_all.sh index 17e654ac51f..8ee326a9c69 100755 --- a/storage/maria/ma_test_all.sh +++ b/storage/maria/ma_test_all.sh @@ -162,6 +162,9 @@ run_pack_tests -S echo "Running tests with block row format" run_tests -M +echo "Running tests with block row format and transactions" +run_tests "-M -T" + # # Tests that gives warnings # diff --git a/storage/maria/ma_update.c b/storage/maria/ma_update.c index ee0f638ea7c..db0f5641124 100644 --- a/storage/maria/ma_update.c +++ b/storage/maria/ma_update.c @@ -162,7 +162,7 @@ int maria_update(register MARIA_HA *info, const byte *oldrec, byte *newrec) memcpy((char*) &state, (char*) info->state, sizeof(state)); org_split= share->state.split; org_delete_link= share->state.dellink; - if ((*share->update_record)(info,pos,newrec)) + if ((*share->update_record)(info, pos, oldrec, newrec)) goto err; if (!key_changed && (memcmp((char*) &state, (char*) info->state, sizeof(state)) || diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h index 5e56b0edc5a..25e27c8d296 100644 --- a/storage/maria/maria_def.h +++ b/storage/maria/maria_def.h @@ -25,10 +25,14 @@ #include <my_no_pthread.h> #endif -#include <pagecache.h> #include "ma_loghandler.h" #include "ma_control_file.h" +#define MAX_NONMAPPED_INSERTS 1000 +#define MARIA_MAX_TREE_LEVELS 32 + +struct st_transaction; + /* undef map from my_nosys; We need test-if-disk full */ #undef my_write @@ -205,8 +209,6 @@ typedef struct st_maria_file_bitmap } MARIA_FILE_BITMAP; -#define MAX_NONMAPPED_INSERTS 1000 - typedef struct st_maria_share { /* Shared between opens */ MARIA_STATE_INFO state; @@ -250,8 +252,8 @@ typedef struct st_maria_share /* Called when write failed */ my_bool (*write_record_abort)(struct st_maria_info *); my_bool (*update_record)(struct st_maria_info *, MARIA_RECORD_POS, - const byte *); - my_bool (*delete_record)(struct st_maria_info *); + const byte *, const byte *); + my_bool (*delete_record)(struct st_maria_info *, const byte *record); my_bool (*compare_record)(struct st_maria_info *, const byte *); /* calculate checksum for a row */ ha_checksum(*calc_checksum)(struct st_maria_info *, const byte *); @@ -288,6 +290,7 @@ typedef struct st_maria_share uint base_length; myf write_flag; enum data_file_type data_file_type; + enum pagecache_page_type page_type; /* value depending transactional */ my_bool temporary; /* Below flag is needed to make log tables work with concurrent insert */ my_bool is_log_table; @@ -345,7 +348,6 @@ typedef struct st_maria_row MARIA_RECORD_POS *tail_positions; ha_checksum checksum; byte *empty_bits, *field_lengths; - byte *empty_bits_buffer; /* For storing cur_row.empty_bits */ uint *null_field_lengths; /* All null field lengths */ ulong *blob_lengths; /* Length for each blob */ ulong base_length, normal_length, char_length, varchar_length, blob_length; @@ -371,6 +373,7 @@ typedef struct st_maria_block_scan struct st_maria_info { MARIA_SHARE *s; /* Shared between open:s */ + struct st_transaction *trn; /* Pointer to active transaction */ MARIA_STATUS_INFO *state, save_state; MARIA_ROW cur_row; /* The active row that we just read */ MARIA_ROW new_row; /* Storage for a row during update */ @@ -378,8 +381,10 @@ struct st_maria_info MARIA_BLOB *blobs; /* Pointer to blobs */ MARIA_BIT_BUFF bit_buff; DYNAMIC_ARRAY bitmap_blocks; + DYNAMIC_ARRAY pinned_pages; /* accumulate indexfile changes between write's */ TREE *bulk_insert; + LEX_STRING *log_row_parts; /* For logging */ DYNAMIC_ARRAY *ft1_to_ft2; /* used only in ft1->ft2 conversion */ MEM_ROOT ft_memroot; /* used by the parser */ MYSQL_FTPARSER_PARAM *ftparser_param; /* share info between init/deinit */ @@ -391,6 +396,7 @@ struct st_maria_info byte *rec_buff; /* Temp buffer for recordpack */ byte *int_keypos, /* Save position for next/previous */ *int_maxpos; /* -""- */ + byte *update_field_data; /* Used by update in rows-in-block */ uint int_nod_flag; /* -""- */ uint32 int_keytree_version; /* -""- */ int (*read_record) (struct st_maria_info *, byte*, MARIA_RECORD_POS); @@ -568,8 +574,8 @@ extern pthread_mutex_t THR_LOCK_maria; #define rw_unlock(A) {} #endif - /* Some extern variables */ +/* Some extern variables */ extern LIST *maria_open_list; extern uchar NEAR maria_file_magic[], NEAR maria_pack_file_magic[]; extern uint NEAR maria_read_vec[], NEAR maria_readnext_vec[]; @@ -578,8 +584,8 @@ extern const char *maria_data_root; extern byte maria_zero_string[]; extern my_bool maria_inited; - /* This is used by _ma_calc_xxx_key_length och _ma_store_key */ +/* This is used by _ma_calc_xxx_key_length och _ma_store_key */ typedef struct st_maria_s_param { uint ref_length, key_length, n_ref_length; @@ -589,26 +595,34 @@ typedef struct st_maria_s_param bool store_not_null; } MARIA_KEY_PARAM; - /* Prototypes for intern functions */ +/* Used to store reference to pinned page */ +typedef struct st_pinned_page +{ + PAGECACHE_PAGE_LINK link; + enum pagecache_page_lock unlock; +} MARIA_PINNED_PAGE; + + +/* Prototypes for intern functions */ extern int _ma_read_dynamic_record(MARIA_HA *, byte *, MARIA_RECORD_POS); extern int _ma_read_rnd_dynamic_record(MARIA_HA *, byte *, MARIA_RECORD_POS, my_bool); extern my_bool _ma_write_dynamic_record(MARIA_HA *, const byte *); extern my_bool _ma_update_dynamic_record(MARIA_HA *, MARIA_RECORD_POS, - const byte *); -extern my_bool _ma_delete_dynamic_record(MARIA_HA *info); + const byte *, const byte *); +extern my_bool _ma_delete_dynamic_record(MARIA_HA *info, const byte *record); extern my_bool _ma_cmp_dynamic_record(MARIA_HA *info, const byte *record); extern my_bool _ma_write_blob_record(MARIA_HA *, const byte *); extern my_bool _ma_update_blob_record(MARIA_HA *, MARIA_RECORD_POS, - const byte *); + const byte *, const byte *); extern int _ma_read_static_record(MARIA_HA *info, byte *, MARIA_RECORD_POS); extern int _ma_read_rnd_static_record(MARIA_HA *, byte *, MARIA_RECORD_POS, my_bool); extern my_bool _ma_write_static_record(MARIA_HA *, const byte *); extern my_bool _ma_update_static_record(MARIA_HA *, MARIA_RECORD_POS, - const byte *); -extern my_bool _ma_delete_static_record(MARIA_HA *info); + const byte *, const byte *); +extern my_bool _ma_delete_static_record(MARIA_HA *info, const byte *record); extern my_bool _ma_cmp_static_record(MARIA_HA *info, const byte *record); extern int _ma_ck_write(MARIA_HA *info, uint keynr, byte *key, uint length); @@ -891,3 +905,8 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages, ulong); int _ma_sync_table_files(const MARIA_HA *info); int _ma_initialize_data_file(File dfile, MARIA_SHARE *share); + +void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn); + +extern PAGECACHE *maria_log_pagecache; + diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c index 7918c1aa00d..9b8c36f9769 100644 --- a/storage/maria/trnman.c +++ b/storage/maria/trnman.c @@ -16,7 +16,6 @@ #include <my_global.h> #include <my_sys.h> -#include <lf.h> #include <m_string.h> #include "trnman.h" @@ -51,19 +50,38 @@ static TRN **short_trid_to_active_trn; /* locks for short_trid_to_active_trn and pool */ static my_atomic_rwlock_t LOCK_short_trid_to_trn, LOCK_pool; -static LOCKMAN maria_lockman; - /* - short transaction id is at the same time its identifier - for a lock manager - its lock owner identifier (loid) + Simple interface functions */ -#define short_id locks.loid + +uint trnman_increment_locked_tables(TRN *trn) +{ + return trn->locked_tables++; +} + +my_bool trnman_has_locked_tables(TRN *trn) +{ + return trn->locked_tables != 0; +} + +uint trnman_decrement_locked_tables(TRN *trn) +{ + return --trn->locked_tables; +} + +void trnman_reset_locked_tables(TRN *trn) +{ + trn->locked_tables= 0; +} + /* NOTE Just as short_id doubles as loid, this function doubles as short_trid_to_LOCK_OWNER. See the compile-time assert below. */ + +#ifdef NOT_USED static TRN *short_trid_to_TRN(uint16 short_trid) { TRN *trn; @@ -73,6 +91,7 @@ static TRN *short_trid_to_TRN(uint16 short_trid) my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn); return (TRN *)trn; } +#endif static byte *trn_get_hash_key(const byte *trn, uint* len, my_bool unused __attribute__ ((unused))) @@ -83,6 +102,7 @@ static byte *trn_get_hash_key(const byte *trn, uint* len, int trnman_init() { + DBUG_ENTER("trnman_init"); /* Initialize lists. active_list_max.min_read_from must be larger than any trid, @@ -94,12 +114,12 @@ int trnman_init() */ active_list_max.trid= active_list_min.trid= 0; - active_list_max.min_read_from= ~0; + active_list_max.min_read_from= ~(ulong) 0; active_list_max.next= active_list_min.prev= 0; active_list_max.prev= &active_list_min; active_list_min.next= &active_list_max; - committed_list_max.commit_trid= ~0; + committed_list_max.commit_trid= ~(ulong) 0; committed_list_max.next= committed_list_min.prev= 0; committed_list_max.prev= &committed_list_min; committed_list_min.next= &committed_list_max; @@ -112,18 +132,21 @@ int trnman_init() global_trid_generator= 0; /* set later by the recovery code */ lf_hash_init(&trid_to_committed_trn, sizeof(TRN*), LF_HASH_UNIQUE, 0, 0, trn_get_hash_key, 0); + DBUG_PRINT("info", ("pthread_mutex_init LOCK_trn_list")); pthread_mutex_init(&LOCK_trn_list, MY_MUTEX_INIT_FAST); my_atomic_rwlock_init(&LOCK_short_trid_to_trn); my_atomic_rwlock_init(&LOCK_pool); short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*), MYF(MY_WME|MY_ZEROFILL)); if (unlikely(!short_trid_to_active_trn)) - return 1; + DBUG_RETURN(1); short_trid_to_active_trn--; /* min short_trid is 1 */ +#ifdef NOT_USED lockman_init(&maria_lockman, (loid_to_lo_func *)&short_trid_to_TRN, 10000); +#endif - return 0; + DBUG_RETURN(0); } /* @@ -133,6 +156,7 @@ int trnman_init() */ void trnman_destroy() { + DBUG_ENTER("trnman_destroy"); DBUG_ASSERT(trid_to_committed_trn.count == 0); DBUG_ASSERT(trnman_active_transactions == 0); DBUG_ASSERT(trnman_committed_transactions == 0); @@ -149,11 +173,15 @@ void trnman_destroy() my_free((void *)trn, MYF(0)); } lf_hash_destroy(&trid_to_committed_trn); + DBUG_PRINT("info", ("pthread_mutex_destroy LOCK_trn_list")); pthread_mutex_destroy(&LOCK_trn_list); my_atomic_rwlock_destroy(&LOCK_short_trid_to_trn); my_atomic_rwlock_destroy(&LOCK_pool); my_free((void *)(short_trid_to_active_trn+1), MYF(0)); +#ifdef NOT_USED lockman_destroy(&maria_lockman); +#endif + DBUG_VOID_RETURN; } /* @@ -164,9 +192,11 @@ void trnman_destroy() */ static TrID new_trid() { + DBUG_ENTER("new_trid"); DBUG_ASSERT(global_trid_generator < 0xffffffffffffLL); + DBUG_PRINT("info", ("safe_mutex_assert_owner LOCK_trn_list")); safe_mutex_assert_owner(&LOCK_trn_list); - return ++global_trid_generator; + DBUG_RETURN(++global_trid_generator); } static void set_short_trid(TRN *trn) @@ -189,9 +219,12 @@ static void set_short_trid(TRN *trn) start a new transaction, allocate and initialize transaction object mutex and cond will be used for lock waits */ -TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) + +TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond, + void *stack_end) { TRN *trn; + DBUG_ENTER("trnman_new_trn"); /* we have a mutex, to do simple things under it - allocate a TRN, @@ -202,6 +235,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) mutex. */ + DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list")); pthread_mutex_lock(&LOCK_trn_list); /* Allocating a new TRN structure */ @@ -222,11 +256,19 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) trn= (TRN *)my_malloc(sizeof(TRN), MYF(MY_WME)); if (unlikely(!trn)) { + DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list")); pthread_mutex_unlock(&LOCK_trn_list); return 0; } trnman_allocated_transactions++; } + trn->pins= lf_hash_get_pins(&trid_to_committed_trn, stack_end); + if (!trn->pins) + { + trnman_free_trn(trn); + return 0; + } + trnman_active_transactions++; trn->min_read_from= active_list_min.next->trid; @@ -237,10 +279,9 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) trn->next= &active_list_max; trn->prev= active_list_max.prev; active_list_max.prev= trn->prev->next= trn; + DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list")); pthread_mutex_unlock(&LOCK_trn_list); - trn->pins= lf_hash_get_pins(&trid_to_committed_trn); - if (unlikely(!trn->min_read_from)) trn->min_read_from= trn->trid; @@ -250,7 +291,11 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) trn->locks.cond= cond; trn->locks.waiting_for= 0; trn->locks.all_locks= 0; +#ifdef NOT_USED trn->locks.pins= lf_alloc_get_pins(&maria_lockman.alloc); +#endif + + trn->locked_tables= 0; /* only after the following function TRN is considered initialized, @@ -258,7 +303,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) */ set_short_trid(trn); - return trn; + DBUG_RETURN(trn); } /* @@ -273,12 +318,19 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) released arbitrarily late. In other words, when locks are released it serves as a start banner for other threads, they start to run. So everything they may need must be ready at that point. + + RETURN + 0 ok + 1 error */ -void trnman_end_trn(TRN *trn, my_bool commit) +int trnman_end_trn(TRN *trn, my_bool commit) { + int res= 1; TRN *free_me= 0; LF_PINS *pins= trn->pins; + DBUG_ENTER("trnman_end_trn"); + DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list")); pthread_mutex_lock(&LOCK_trn_list); /* remove from active list */ @@ -314,30 +366,41 @@ void trnman_end_trn(TRN *trn, my_bool commit) /* if transaction is committed and it was not the only active transaction - add it to the committed list (which is used for read-from relation) + TODO check in the condition below that a transaction have made some + changes, was not read-only. Something like '&& UndoLSN != 0' */ if (commit && active_list_min.next != &active_list_max) { - int res; - trn->commit_trid= global_trid_generator; trn->next= &committed_list_max; trn->prev= committed_list_max.prev; - committed_list_max.prev= trn->prev->next= trn; trnman_committed_transactions++; res= lf_hash_insert(&trid_to_committed_trn, pins, &trn); - DBUG_ASSERT(res == 0); + DBUG_ASSERT(res <= 0); } - else /* otherwise free it right away */ + if (res) { + /* + res == 1 means the condition in the if() above + was false. + res == -1 means lf_hash_insert failed + */ trn->next= free_me; free_me= trn; } + else + { + committed_list_max.prev= trn->prev->next= trn; + } trnman_active_transactions--; + DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list")); pthread_mutex_unlock(&LOCK_trn_list); /* the rest is done outside of a critical section */ +#ifdef NOT_USED lockman_release_locks(&maria_lockman, &trn->locks); +#endif trn->locks.mutex= 0; trn->locks.cond= 0; my_atomic_rwlock_rdlock(&LOCK_short_trid_to_trn); @@ -356,13 +419,20 @@ void trnman_end_trn(TRN *trn, my_bool commit) TRN *t= free_me; free_me= free_me->next; - lf_hash_delete(&trid_to_committed_trn, pins, &t->trid, sizeof(TrID)); + /* + ignore OOM here. it's harmless, and there's nothing we could do, anyway + */ + (void)lf_hash_delete(&trid_to_committed_trn, pins, &t->trid, sizeof(TrID)); trnman_free_trn(t); } lf_hash_put_pins(pins); +#ifdef NOT_USED lf_pinbox_put_pins(trn->locks.pins); +#endif + + DBUG_RETURN(res < 0); } /* @@ -404,27 +474,44 @@ void trnman_free_trn(TRN *trn) found->trid >= trn->min_read_from and found->commit_trid > found->trid + + RETURN + 1 can + 0 cannot + -1 error (OOM) */ -my_bool trnman_can_read_from(TRN *trn, TrID trid) +int trnman_can_read_from(TRN *trn, TrID trid) { TRN **found; my_bool can; LF_REQUIRE_PINS(3); if (trid < trn->min_read_from) - return TRUE; /* can read */ + return 1; /* can read */ if (trid > trn->trid) - return FALSE; /* cannot read */ + return 0; /* cannot read */ found= lf_hash_search(&trid_to_committed_trn, trn->pins, &trid, sizeof(trid)); - if (!found) - return FALSE; /* not in the hash of committed transactions = cannot read */ + if (found == NULL) + return 0; /* not in the hash of committed transactions = cannot read */ + if (found == MY_ERRPTR) + return -1; can= (*found)->commit_trid < trn->trid; - lf_unpin(trn->pins, 2); + lf_hash_search_unpin(trn->pins); return can; } +/* TODO: the stubs below are waiting for savepoints to be implemented */ + +void trnman_new_statement(TRN *trn __attribute__ ((unused))) +{ +} + +void trnman_rollback_statement(TRN *trn __attribute__ ((unused))) +{ +} + /* Allocates two buffers and stores in them some information about transactions @@ -458,6 +545,7 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com) DBUG_ASSERT((NULL == str_act->str) && (NULL == str_com->str)); + DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list")); pthread_mutex_lock(&LOCK_trn_list); str_act->length= 8+(6+2+7+7+7)*trnman_active_transactions; str_com->length= 8+(6+7+7)*trnman_committed_transactions; @@ -513,6 +601,7 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com) err: error= 1; end: + DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list")); pthread_mutex_unlock(&LOCK_trn_list); DBUG_RETURN(error); } diff --git a/storage/maria/trnman.h b/storage/maria/trnman.h index 87107ab52fb..24936253935 100644 --- a/storage/maria/trnman.h +++ b/storage/maria/trnman.h @@ -16,10 +16,12 @@ #ifndef _trnman_h #define _trnman_h -#include "lockman.h" +C_MODE_START -typedef uint64 TrID; /* our TrID is 6 bytes */ -typedef struct st_transaction TRN; +#include <lf.h> +#include "lockman.h" +#include "trnman_public.h" +#include "ma_loghandler_lsn.h" /* trid - 6 byte transaction identifier. Assigned when a transaction @@ -29,28 +31,28 @@ typedef struct st_transaction TRN; short_trid - 2-byte transaction identifier, identifies a running transaction, is reassigned when transaction ends. */ + +/* + short transaction id is at the same time its identifier + for a lock manager - its lock owner identifier (loid) +*/ + +#define short_id locks.loid + struct st_transaction { LOCK_OWNER locks; /* must be the first! see short_trid_to_TRN() */ LF_PINS *pins; TrID trid, min_read_from, commit_trid; TRN *next, *prev; + LSN undo_lsn; + uint locked_tables; /* Note! if locks.loid is 0, trn is NOT initialized */ }; -#define SHORT_TRID_MAX 65535 - -extern uint trnman_active_transactions, trnman_allocated_transactions; +TRN dummy_transaction_object; -int trnman_init(void); -void trnman_destroy(void); -TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond); -void trnman_end_trn(TRN *trn, my_bool commit); -#define trnman_commit_trn(T) trnman_end_trn(T, TRUE) -#define trnman_abort_trn(T) trnman_end_trn(T, FALSE) -void trnman_free_trn(TRN *trn); -my_bool trnman_can_read_from(TRN *trn, TrID trid); -my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com); +C_MODE_END #endif diff --git a/storage/maria/trnman_public.h b/storage/maria/trnman_public.h new file mode 100644 index 00000000000..4b3f8acb4b3 --- /dev/null +++ b/storage/maria/trnman_public.h @@ -0,0 +1,49 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +/* + External definitions for trnman.h + We need to split this into two files as gcc 4.1.2 gives error if it tries + to include my_atomic.h in C++ code. +*/ + +C_MODE_START +typedef uint64 TrID; /* our TrID is 6 bytes */ +typedef struct st_transaction TRN; + +#define SHORT_TRID_MAX 65535 + +extern uint trnman_active_transactions, trnman_allocated_transactions; + +int trnman_init(void); +void trnman_destroy(void); +TRN *trnman_new_trn(pthread_mutex_t *, pthread_cond_t *, void *); +int trnman_end_trn(TRN *trn, my_bool commit); +#define trnman_commit_trn(T) trnman_end_trn(T, TRUE) +#define trnman_abort_trn(T) trnman_end_trn(T, FALSE) +#define trnman_rollback_trn(T) trnman_end_trn(T, FALSE) +void trnman_free_trn(TRN *trn); +int trnman_can_read_from(TRN *trn, TrID trid); +void trnman_new_statement(TRN *trn); +void trnman_rollback_statement(TRN *trn); +my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com); + +uint trnman_increment_locked_tables(TRN *trn); +uint trnman_decrement_locked_tables(TRN *trn); +my_bool trnman_has_locked_tables(TRN *trn); +void trnman_reset_locked_tables(TRN *trn); + +C_MODE_END diff --git a/storage/maria/unittest/ma_pagecache_single.c b/storage/maria/unittest/ma_pagecache_single.c index 91cceee618d..2dfa4e89feb 100644 --- a/storage/maria/unittest/ma_pagecache_single.c +++ b/storage/maria/unittest/ma_pagecache_single.c @@ -236,7 +236,7 @@ int simple_pin_test() 0, PAGECACHE_LOCK_READ_UNLOCK, PAGECACHE_UNPIN, - 0); + 0, 0); if (flush_pagecache_blocks(&pagecache, &file1, FLUSH_FORCE_WRITE)) { diag("Got error in flush_pagecache_blocks\n"); @@ -384,7 +384,7 @@ int simple_big_test() } } } - ok(1, "simple big file sequentally read"); + ok(1, "Simple big file sequential read"); /* chack random reads */ for (i= 0; i < PCACHE_SIZE/(PAGE_SIZE); i++) { @@ -403,7 +403,7 @@ int simple_big_test() } } } - ok(1, "simple big file random read"); + ok(1, "Simple big file random read"); flush_pagecache_blocks(&pagecache, &file1, FLUSH_FORCE_WRITE); ok((res= test(test_file(file1, file1_name, PCACHE_SIZE*2, PAGE_SIZE, @@ -432,10 +432,15 @@ static void *test_thread(void *arg) !simple_read_change_write_read_test() || !simple_pin_test() || !simple_delete_forget_test() || - !simple_delete_flush_test() || - !simple_big_test()) + !simple_delete_flush_test()) exit(1); + SKIP_BIG_TESTS(4) + { + if (!simple_big_test()) + exit(1); + } + DBUG_PRINT("info", ("Thread %s ended\n", my_thread_name())); pthread_mutex_lock(&LOCK_thread_count); thread_count--; diff --git a/storage/maria/unittest/ma_test_loghandler-t.c b/storage/maria/unittest/ma_test_loghandler-t.c index 757520322c8..047e9c12bfc 100644 --- a/storage/maria/unittest/ma_test_loghandler-t.c +++ b/storage/maria/unittest/ma_test_loghandler-t.c @@ -108,6 +108,7 @@ int main(int argc, char *argv[]) PAGECACHE pagecache; LSN lsn, lsn_base, first_lsn; TRANSLOG_HEADER_BUFFER rec; + LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 3]; struct st_translog_scanner_data scanner; int rc; @@ -163,9 +164,12 @@ int main(int argc, char *argv[]) long_tr_id[5]= 0xff; int4store(long_tr_id, 0); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; if (translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID, - 0, NULL, 6, long_tr_id, 0)) + 0, NULL, NULL, + 6, TRANSLOG_INTERNAL_PARTS + 1, parts)) { fprintf(stderr, "Can't write record #%lu\n", (ulong) 0); translog_destroy(); @@ -180,9 +184,13 @@ int main(int argc, char *argv[]) if (i % 2) { lsn_store(lsn_buff, lsn_base); - if (translog_write_record(&lsn, - LOGREC_CLR_END, - (i % 0xFFFF), NULL, 7, lsn_buff, 0)) + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= LSN_STORE_SIZE; + /* check auto-count feature */ + parts[TRANSLOG_INTERNAL_PARTS + 1].str= NULL; + parts[TRANSLOG_INTERNAL_PARTS + 1].length= 0; + if (translog_write_record(&lsn, LOGREC_CLR_END, (i % 0xFFFF), NULL, + NULL, LSN_STORE_SIZE, 0, parts)) { fprintf(stderr, "1 Can't write reference defore record #%lu\n", (ulong) i); @@ -194,10 +202,16 @@ int main(int argc, char *argv[]) lsn_store(lsn_buff, lsn_base); if ((rec_len= random() / (RAND_MAX / (LONG_BUFFER_SIZE + 1))) < 12) rec_len= 12; + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= LSN_STORE_SIZE; + parts[TRANSLOG_INTERNAL_PARTS + 1].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 1].length= rec_len; + /* check record length auto-counting */ if (translog_write_record(&lsn, LOGREC_UNDO_KEY_INSERT, (i % 0xFFFF), - NULL, 7, lsn_buff, rec_len, long_buffer, 0)) + NULL, NULL, 0, TRANSLOG_INTERNAL_PARTS + 2, + parts)) { fprintf(stderr, "1 Can't write var reference defore record #%lu\n", (ulong) i); @@ -211,9 +225,12 @@ int main(int argc, char *argv[]) { lsn_store(lsn_buff, lsn_base); lsn_store(lsn_buff + LSN_STORE_SIZE, first_lsn); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 23; if (translog_write_record(&lsn, LOGREC_UNDO_ROW_DELETE, - (i % 0xFFFF), NULL, 23, lsn_buff, 0)) + (i % 0xFFFF), NULL, NULL, + 23, TRANSLOG_INTERNAL_PARTS + 1, parts)) { fprintf(stderr, "0 Can't write reference defore record #%lu\n", (ulong) i); @@ -226,10 +243,15 @@ int main(int argc, char *argv[]) lsn_store(lsn_buff + LSN_STORE_SIZE, first_lsn); if ((rec_len= random() / (RAND_MAX / (LONG_BUFFER_SIZE + 1))) < 19) rec_len= 19; + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 14; + parts[TRANSLOG_INTERNAL_PARTS + 1].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 1].length= rec_len; if (translog_write_record(&lsn, LOGREC_UNDO_KEY_DELETE, (i % 0xFFFF), - NULL, 14, lsn_buff, rec_len, long_buffer, 0)) + NULL, NULL, 14 + rec_len, + TRANSLOG_INTERNAL_PARTS + 2, parts)) { fprintf(stderr, "0 Can't write var reference defore record #%lu\n", (ulong) i); @@ -240,9 +262,13 @@ int main(int argc, char *argv[]) ok(1, "write LOGREC_UNDO_KEY_DELETE"); } int4store(long_tr_id, i); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; if (translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID, - (i % 0xFFFF), NULL, 6, long_tr_id, 0)) + (i % 0xFFFF), NULL, NULL, 6, + TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "Can't write record #%lu\n", (ulong) i); translog_destroy(); @@ -255,9 +281,13 @@ int main(int argc, char *argv[]) if ((rec_len= random() / (RAND_MAX / (LONG_BUFFER_SIZE + 1))) < 9) rec_len= 9; + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= rec_len; if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_HEAD, - (i % 0xFFFF), NULL, rec_len, long_buffer, 0)) + (i % 0xFFFF), NULL, NULL, rec_len, + TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i); translog_destroy(); diff --git a/storage/maria/unittest/ma_test_loghandler_multigroup-t.c b/storage/maria/unittest/ma_test_loghandler_multigroup-t.c index 4aaf30bd9a3..f07ceab1a49 100644 --- a/storage/maria/unittest/ma_test_loghandler_multigroup-t.c +++ b/storage/maria/unittest/ma_test_loghandler_multigroup-t.c @@ -124,6 +124,7 @@ int main(int argc, char *argv[]) PAGECACHE pagecache; LSN lsn, lsn_base, first_lsn; TRANSLOG_HEADER_BUFFER rec; + LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 2]; struct st_translog_scanner_data scanner; int rc; @@ -183,9 +184,10 @@ int main(int argc, char *argv[]) long_tr_id[5]= 0xff; int4store(long_tr_id, 0); - if (translog_write_record(&lsn, - LOGREC_LONG_TRANSACTION_ID, - 0, NULL, 6, long_tr_id, 0)) + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; + if (translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID, 0, NULL, NULL, + 6, TRANSLOG_INTERNAL_PARTS + 1, parts)) { fprintf(stderr, "Can't write record #%lu\n", (ulong) 0); translog_destroy(); @@ -200,10 +202,13 @@ int main(int argc, char *argv[]) if (i % 2) { lsn_store(lsn_buff, lsn_base); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= LSN_STORE_SIZE; if (translog_write_record(&lsn, LOGREC_CLR_END, - (i % 0xFFFF), NULL, - LSN_STORE_SIZE, lsn_buff, 0)) + (i % 0xFFFF), NULL, NULL, + LSN_STORE_SIZE, + TRANSLOG_INTERNAL_PARTS + 1, parts)) { fprintf(stderr, "1 Can't write reference before record #%lu\n", (ulong) i); @@ -214,11 +219,16 @@ int main(int argc, char *argv[]) ok(1, "write LOGREC_CLR_END"); lsn_store(lsn_buff, lsn_base); rec_len= get_len(); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= LSN_STORE_SIZE; + parts[TRANSLOG_INTERNAL_PARTS + 1].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 1].length= rec_len; if (translog_write_record(&lsn, LOGREC_UNDO_KEY_INSERT, (i % 0xFFFF), - NULL, LSN_STORE_SIZE, lsn_buff, - rec_len, long_buffer, 0)) + NULL, NULL, LSN_STORE_SIZE + rec_len, + TRANSLOG_INTERNAL_PARTS + 2, + parts)) { fprintf(stderr, "1 Can't write var reference before record #%lu\n", (ulong) i); @@ -232,9 +242,13 @@ int main(int argc, char *argv[]) { lsn_store(lsn_buff, lsn_base); lsn_store(lsn_buff + LSN_STORE_SIZE, first_lsn); + parts[TRANSLOG_INTERNAL_PARTS + 1].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 1].length= 23; if (translog_write_record(&lsn, LOGREC_UNDO_ROW_DELETE, - (i % 0xFFFF), NULL, 23, lsn_buff, 0)) + (i % 0xFFFF), NULL, NULL, 23, + TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "0 Can't write reference before record #%lu\n", (ulong) i); @@ -246,11 +260,16 @@ int main(int argc, char *argv[]) lsn_store(lsn_buff, lsn_base); lsn_store(lsn_buff + LSN_STORE_SIZE, first_lsn); rec_len= get_len(); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)lsn_buff; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= LSN_STORE_SIZE * 2; + parts[TRANSLOG_INTERNAL_PARTS + 1].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 1].length= rec_len; if (translog_write_record(&lsn, LOGREC_UNDO_KEY_DELETE, (i % 0xFFFF), - NULL, LSN_STORE_SIZE * 2, lsn_buff, - rec_len, long_buffer, 0)) + NULL, NULL, LSN_STORE_SIZE * 2 + rec_len, + TRANSLOG_INTERNAL_PARTS + 2, + parts)) { fprintf(stderr, "0 Can't write var reference before record #%lu\n", (ulong) i); @@ -261,9 +280,12 @@ int main(int argc, char *argv[]) ok(1, "write LOGREC_UNDO_KEY_DELETE"); } int4store(long_tr_id, i); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; if (translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID, - (i % 0xFFFF), NULL, 6, long_tr_id, 0)) + (i % 0xFFFF), NULL, NULL, 6, + TRANSLOG_INTERNAL_PARTS + 1, parts)) { fprintf(stderr, "Can't write record #%lu\n", (ulong) i); translog_destroy(); @@ -275,9 +297,12 @@ int main(int argc, char *argv[]) lsn_base= lsn; rec_len= get_len(); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= rec_len; if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_HEAD, - (i % 0xFFFF), NULL, rec_len, long_buffer, 0)) + (i % 0xFFFF), NULL, NULL, rec_len, + TRANSLOG_INTERNAL_PARTS + 1, parts)) { fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i); translog_destroy(); diff --git a/storage/maria/unittest/ma_test_loghandler_multithread-t.c b/storage/maria/unittest/ma_test_loghandler_multithread-t.c index 1834e720328..2651258e290 100644 --- a/storage/maria/unittest/ma_test_loghandler_multithread-t.c +++ b/storage/maria/unittest/ma_test_loghandler_multithread-t.c @@ -124,12 +124,16 @@ void writer(int num) { uint len= get_len(); lens[num][i]= len; + LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1]; int2store(long_tr_id, num); int4store(long_tr_id + 2, i); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; if (translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID, - num, NULL, 6, long_tr_id, 0)) + num, NULL, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "Can't write LOGREC_LONG_TRANSACTION_ID record #%lu " "thread %i\n", (ulong) i, num); @@ -140,9 +144,13 @@ void writer(int num) return; } lsns1[num][i]= lsn; + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= len; if (translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_HEAD, - num, NULL, len, long_buffer, 0)) + num, NULL, NULL, + len, TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i); translog_destroy(); @@ -277,14 +285,18 @@ int main(int argc, char **argv __attribute__ ((unused))) srandom(122334817L); { + LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1]; byte long_tr_id[6]= { 0x11, 0x22, 0x33, 0x44, 0x55, 0x66 }; + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; if (translog_write_record(&first_lsn, LOGREC_LONG_TRANSACTION_ID, - 0, NULL, 6, long_tr_id, 0)) + 0, NULL, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "Can't write the first record\n"); translog_destroy(); diff --git a/storage/maria/unittest/ma_test_loghandler_pagecache-t.c b/storage/maria/unittest/ma_test_loghandler_pagecache-t.c index 9204d531ea1..13b5afe7444 100644 --- a/storage/maria/unittest/ma_test_loghandler_pagecache-t.c +++ b/storage/maria/unittest/ma_test_loghandler_pagecache-t.c @@ -25,6 +25,7 @@ int main(int argc, char *argv[]) PAGECACHE pagecache; LSN lsn; MY_STAT st, *stat; + LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1]; MY_INIT(argv[0]); @@ -85,9 +86,12 @@ int main(int argc, char *argv[]) exit(1); } int4store(long_tr_id, 0); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; if (translog_write_record(&lsn, LOGREC_LONG_TRANSACTION_ID, - 0, NULL, 6, long_tr_id, 0)) + 0, NULL, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1, + parts)) { fprintf(stderr, "Can't write record #%lu\n", (ulong) 0); translog_destroy(); diff --git a/storage/maria/unittest/trnman-t.c b/storage/maria/unittest/trnman-t.c index 7d97794b685..b0a087370f2 100644 --- a/storage/maria/unittest/trnman-t.c +++ b/storage/maria/unittest/trnman-t.c @@ -24,8 +24,11 @@ #include "../trnman.h" pthread_mutex_t rt_mutex; -int rt_num_threads; +pthread_attr_t attr; +size_t stacksize= 0; +#define STACK_SIZE (((int)stacksize-2048)*STACK_DIRECTION) +int rt_num_threads; int litmus; /* @@ -34,11 +37,11 @@ int litmus; #define MAX_ITER 100 pthread_handler_t test_trnman(void *arg) { - int m= (*(int *)arg); uint x, y, i, n; TRN *trn[MAX_ITER]; pthread_mutex_t mutexes[MAX_ITER]; pthread_cond_t conds[MAX_ITER]; + int m= (*(int *)arg); for (i= 0; i < MAX_ITER; i++) { @@ -52,7 +55,7 @@ pthread_handler_t test_trnman(void *arg) m-= n= x % MAX_ITER; for (i= 0; i < n; i++) { - trn[i]= trnman_new_trn(&mutexes[i], &conds[i]); + trn[i]= trnman_new_trn(&mutexes[i], &conds[i], &m + STACK_SIZE); if (!trn[i]) { diag("trnman_new_trn() failed"); @@ -96,7 +99,7 @@ void run_test(const char *test, pthread_handler handler, int n, int m) diag("Testing %s with %d threads, %d iterations... ", test, n, m); rt_num_threads= n; for (i= 0; i < n ; i++) - if (pthread_create(threads+i, 0, handler, &m)) + if (pthread_create(threads+i, &attr, handler, &m)) { diag("Could not create thread"); abort(); @@ -112,7 +115,7 @@ void run_test(const char *test, pthread_handler handler, int n, int m) i= trnman_can_read_from(trn[T1], trn[T2]->trid); \ ok(i == RES, "trn" #T1 " %s read from trn" #T2, i ? "can" : "cannot") #define start_transaction(T) \ - trn[T]= trnman_new_trn(&mutexes[T], &conds[T]) + trn[T]= trnman_new_trn(&mutexes[T], &conds[T], &i + STACK_SIZE) #define commit(T) trnman_commit_trn(trn[T]) #define abort(T) trnman_abort_trn(trn[T]) @@ -161,6 +164,12 @@ int main() return exit_status(); pthread_mutex_init(&rt_mutex, 0); + pthread_attr_init(&attr); +#ifdef HAVE_PTHREAD_ATTR_GETSTACKSIZE + pthread_attr_getstacksize(&attr, &stacksize); + if (stacksize == 0) +#endif + stacksize= PTHREAD_STACK_MIN; #define CYCLES 10000 #define THREADS 10 diff --git a/unittest/mytap/tap.c b/unittest/mytap/tap.c index a99da3fd975..f9396adbd69 100644 --- a/unittest/mytap/tap.c +++ b/unittest/mytap/tap.c @@ -166,9 +166,15 @@ static signal_entry install_signal[]= { #endif }; +int skip_big_tests= 0; + void plan(int const count) { + char *config= getenv("MYTAP_CONFIG"); + + if (config) + skip_big_tests= strcmp(config, "big"); setvbuf(tapout, 0, _IONBF, 0); /* provide output at once */ /* diff --git a/unittest/mytap/tap.h b/unittest/mytap/tap.h index 31ec47d1ef2..f92fad1101f 100644 --- a/unittest/mytap/tap.h +++ b/unittest/mytap/tap.h @@ -62,6 +62,24 @@ extern "C" { #endif /** + Defines whether "big" tests should be skipped. + + This variable is set by plan() function unless MYTAP_CONFIG environment + variable is set to the string "big". It is supposed to be used as + + @code + if (skip_big_tests) { + skip(1, "Big test skipped"); + } else { + ok(life_universe_and_everything() == 42, "The answer is CORRECT"); + } + @endcode + + @see SKIP_BIG_TESTS +*/ +extern int skip_big_tests; + +/** @defgroup MyTAP_API MyTAP API MySQL support for performing unit tests according to TAP. @@ -81,7 +99,12 @@ extern "C" { that generate a core, so if you want to override these signals, do it <em>after</em> you have called the plan() function. - @param count The planned number of tests to run. + It will also set skip_big_tests variable if MYTAP_CONFIG environment + variable is defined. + + @see skip_big_tests + + @param count The planned number of tests to run. */ void plan(int count); @@ -161,6 +184,24 @@ void skip(int how_many, char const *reason, ...) /** + Helper macro to skip a group of "big" tests. It is used in the following + manner: + + @code + SKIP_BIG_TESTS(1) + { + ok(life_universe_and_everything() == 42, "The answer is CORRECT"); + } + @endcode + + @see skip_big_tests + */ + +#define SKIP_BIG_TESTS(COUNT) \ + if (skip_big_tests) skip((COUNT), "big test"); else + + +/** Print a diagnostics message. @param fmt Diagnostics message in printf() format. diff --git a/unittest/unit.pl b/unittest/unit.pl index 9d328985012..b83132581f9 100644 --- a/unittest/unit.pl +++ b/unittest/unit.pl @@ -14,8 +14,9 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -use Test::Harness qw(&runtests $verbose); +use Test::Harness; use File::Find; +use Getopt::Long; use strict; @@ -35,6 +36,15 @@ unit - Run unit tests in directory =cut +my $big=1; + +my $result = GetOptions ( + "big!" => \$big, + "verbose!" => \$Test::Harness::verbose, +); + +$ENV{'MYTAP_CONFIG'} = $big ? "big" : ""; + my $cmd = shift; if (defined $cmd && exists $dispatch{$cmd}) { |