diff options
Diffstat (limited to 'storage/xtradb/os/os0file.c')
-rw-r--r-- | storage/xtradb/os/os0file.c | 195 |
1 files changed, 131 insertions, 64 deletions
diff --git a/storage/xtradb/os/os0file.c b/storage/xtradb/os/os0file.c index 2d130899622..efef608b88f 100644 --- a/storage/xtradb/os/os0file.c +++ b/storage/xtradb/os/os0file.c @@ -73,6 +73,28 @@ UNIV_INTERN ibool os_aio_use_native_aio = FALSE; UNIV_INTERN ibool os_aio_print_debug = FALSE; +/* State for the state of an IO request in simulated AIO. + Protocol for simulated aio: + client requests IO: find slot with reserved = FALSE. Add entry with + status = OS_AIO_NOT_ISSUED. + IO thread wakes: find adjacent slots with reserved = TRUE and status = + OS_AIO_NOT_ISSUED. Change status for slots to + OS_AIO_ISSUED. + IO operation completes: set status for slots to OS_AIO_DONE. set status + for the first slot to OS_AIO_CLAIMED and return + result for that slot. + When there are multiple read and write threads, they all compete to execute + the requests in the array (os_aio_array_t). This avoids the need to load + balance requests at the time the request is made at the cost of waking all + threads when a request is available. +*/ +typedef enum { + OS_AIO_NOT_ISSUED, /* Available to be processed by an IO thread. */ + OS_AIO_ISSUED, /* Being processed by an IO thread. */ + OS_AIO_DONE, /* Request processed. */ + OS_AIO_CLAIMED /* Result being returned to client. */ +} os_aio_status; + /* The aio array slot structure */ typedef struct os_aio_slot_struct os_aio_slot_t; @@ -81,6 +103,8 @@ struct os_aio_slot_struct{ ulint pos; /* index of the slot in the aio array */ ibool reserved; /* TRUE if this slot is reserved */ + os_aio_status status; /* Status for current request. Valid when reserved + is TRUE. Used only in simulated aio. */ time_t reservation_time;/* time when reserved */ ulint len; /* length of the block to read or write */ @@ -91,11 +115,11 @@ struct os_aio_slot_struct{ ulint offset_high; /* 32 high bits of file offset */ os_file_t file; /* file where to read or write */ const char* name; /* file name or path */ - ibool io_already_done;/* used only in simulated aio: - TRUE if the physical i/o already - made and only the slot message - needs to be passed to the caller - of os_aio_simulated_handle */ +// ibool io_already_done;/* used only in simulated aio: +// TRUE if the physical i/o already +// made and only the slot message +// needs to be passed to the caller +// of os_aio_simulated_handle */ fil_node_t* message1; /* message which is given by the */ void* message2; /* the requester of an aio operation and which can be used to identify @@ -141,6 +165,13 @@ struct os_aio_array_struct{ /* Array of events used in simulated aio */ static os_event_t* os_aio_segment_wait_events = NULL; +/* Number for the first global segment for reading. */ +const ulint os_aio_first_read_segment = 2; + +/* Number for the first global segment for writing. Set to +2 + os_aio_read_write_threads. */ +ulint os_aio_first_write_segment = 0; + /* The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These are NULL when the module has not yet been initialized. */ static os_aio_array_t* os_aio_read_array = NULL; @@ -149,11 +180,17 @@ static os_aio_array_t* os_aio_ibuf_array = NULL; static os_aio_array_t* os_aio_log_array = NULL; static os_aio_array_t* os_aio_sync_array = NULL; +/* Per thread buffer used for merged IO requests. Used by +os_aio_simulated_handle so that a buffer doesn't have to be allocated +for each request. */ +static char* os_aio_thread_buffer[SRV_MAX_N_IO_THREADS]; +static ulint os_aio_thread_buffer_size[SRV_MAX_N_IO_THREADS]; + static ulint os_aio_n_segments = ULINT_UNDEFINED; /* If the following is TRUE, read i/o handler threads try to wait until a batch of new read requests have been posted */ -static ibool os_aio_recommend_sleep_for_read_threads = FALSE; +static volatile ibool os_aio_recommend_sleep_for_read_threads = FALSE; UNIV_INTERN ulint os_n_file_reads = 0; UNIV_INTERN ulint os_bytes_read_since_printout = 0; @@ -2956,6 +2993,8 @@ os_aio_init( for (i = 0; i < n_segments; i++) { srv_set_io_thread_op_info(i, "not started yet"); + os_aio_thread_buffer[i] = 0; + os_aio_thread_buffer_size[i] = 0; } n_per_seg = n / n_segments; @@ -2964,6 +3003,7 @@ os_aio_init( /* fprintf(stderr, "Array n per seg %lu\n", n_per_seg); */ + os_aio_first_write_segment = os_aio_first_read_segment + n_read_threads; os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1); srv_io_thread_function[0] = "insert buffer thread"; @@ -2972,14 +3012,14 @@ os_aio_init( srv_io_thread_function[1] = "log thread"; - os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg, + os_aio_read_array = os_aio_array_create(n_per_seg, n_read_segs); for (i = 2; i < 2 + n_read_segs; i++) { ut_a(i < SRV_MAX_N_IO_THREADS); srv_io_thread_function[i] = "read thread"; } - os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg, + os_aio_write_array = os_aio_array_create(n_per_seg, n_write_segs); for (i = 2 + n_read_segs; i < n_segments; i++) { ut_a(i < SRV_MAX_N_IO_THREADS); @@ -3225,7 +3265,8 @@ loop: slot->buf = buf; slot->offset = offset; slot->offset_high = offset_high; - slot->io_already_done = FALSE; +// slot->io_already_done = FALSE; + slot->status = OS_AIO_NOT_ISSUED; #ifdef WIN_ASYNC_IO control = &(slot->control); @@ -3256,6 +3297,7 @@ os_aio_array_free_slot( ut_ad(slot->reserved); slot->reserved = FALSE; + slot->status = OS_AIO_NOT_ISSUED; array->n_reserved--; @@ -3292,16 +3334,18 @@ os_aio_simulated_wake_handler_thread( segment = os_aio_get_array_and_local_segment(&array, global_segment); - n = array->n_slots / array->n_segments; + n = array->n_slots; /* Look through n slots after the segment * n'th slot */ os_mutex_enter(array->mutex); for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, i + segment * n); + slot = os_aio_array_get_nth_slot(array, i); - if (slot->reserved) { + if (slot->reserved && + (slot->status == OS_AIO_NOT_ISSUED || + slot->status == OS_AIO_DONE)) { /* Found an i/o request */ break; @@ -3311,7 +3355,25 @@ os_aio_simulated_wake_handler_thread( os_mutex_exit(array->mutex); if (i < n) { - os_event_set(os_aio_segment_wait_events[global_segment]); + if (array == os_aio_ibuf_array) { + os_event_set(os_aio_segment_wait_events[0]); + + } else if (array == os_aio_log_array) { + os_event_set(os_aio_segment_wait_events[1]); + + } else if (array == os_aio_read_array) { + ulint x; + for (x = os_aio_first_read_segment; x < os_aio_first_write_segment; x++) + os_event_set(os_aio_segment_wait_events[x]); + + } else if (array == os_aio_write_array) { + ulint x; + for (x = os_aio_first_write_segment; x < os_aio_n_segments; x++) + os_event_set(os_aio_segment_wait_events[x]); + + } else { + ut_a(0); + } } } @@ -3322,8 +3384,6 @@ void os_aio_simulated_wake_handler_threads(void) /*=======================================*/ { - ulint i; - if (os_aio_use_native_aio) { /* We do not use simulated aio: do nothing */ @@ -3332,9 +3392,10 @@ os_aio_simulated_wake_handler_threads(void) os_aio_recommend_sleep_for_read_threads = FALSE; - for (i = 0; i < os_aio_n_segments; i++) { - os_aio_simulated_wake_handler_thread(i); - } + os_aio_simulated_wake_handler_thread(0); + os_aio_simulated_wake_handler_thread(1); + os_aio_simulated_wake_handler_thread(os_aio_first_read_segment); + os_aio_simulated_wake_handler_thread(os_aio_first_write_segment); } /************************************************************************** @@ -3606,7 +3667,7 @@ os_aio_windows_handle( ut_ad(os_aio_validate()); ut_ad(segment < array->n_segments); - n = array->n_slots / array->n_segments; + n = array->n_slots; if (array == os_aio_sync_array) { os_event_wait(os_aio_array_get_nth_slot(array, pos)->event); @@ -3615,12 +3676,12 @@ os_aio_windows_handle( srv_set_io_thread_op_info(orig_seg, "wait Windows aio"); i = os_event_wait_multiple(n, (array->native_events) - + segment * n); + ); } os_mutex_enter(array->mutex); - slot = os_aio_array_get_nth_slot(array, i + segment * n); + slot = os_aio_array_get_nth_slot(array, i); ut_a(slot->reserved); @@ -3685,10 +3746,13 @@ os_aio_simulated_handle( os_aio_slot_t* slot; os_aio_slot_t* slot2; os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE]; + os_aio_slot_t* lowest_request; + os_aio_slot_t* oldest_request; ulint n_consecutive; ulint total_len; ulint offs; ulint lowest_offset; + ulint oldest_offset; ulint biggest_age; ulint age; byte* combined_buf; @@ -3696,6 +3760,7 @@ os_aio_simulated_handle( ibool ret; ulint n; ulint i; + time_t now; segment = os_aio_get_array_and_local_segment(&array, global_segment); @@ -3708,7 +3773,7 @@ restart: ut_ad(os_aio_validate()); ut_ad(segment < array->n_segments); - n = array->n_slots / array->n_segments; + n = array->n_slots; /* Look through n slots after the segment * n'th slot */ @@ -3730,9 +3795,9 @@ restart: done */ for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, i + segment * n); + slot = os_aio_array_get_nth_slot(array, i); - if (slot->reserved && slot->io_already_done) { + if (slot->reserved && slot->status == OS_AIO_DONE) { if (os_aio_print_debug) { fprintf(stderr, @@ -3754,67 +3819,57 @@ restart: then pick the one at the lowest offset. */ biggest_age = 0; - lowest_offset = ULINT_MAX; + now = time(NULL); + oldest_request = lowest_request = NULL; + oldest_offset = lowest_offset = ULINT_MAX; + /* Find the oldest request and the request with the smallest offset */ for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, i + segment * n); + slot = os_aio_array_get_nth_slot(array, i); - if (slot->reserved) { - age = (ulint)difftime(time(NULL), - slot->reservation_time); + if (slot->reserved && slot->status == OS_AIO_NOT_ISSUED) { + age = (ulint)difftime(now, slot->reservation_time); if ((age >= 2 && age > biggest_age) || (age >= 2 && age == biggest_age - && slot->offset < lowest_offset)) { + && slot->offset < oldest_offset)) { /* Found an i/o request */ - consecutive_ios[0] = slot; - - n_consecutive = 1; - biggest_age = age; - lowest_offset = slot->offset; + oldest_request = slot; + oldest_offset = slot->offset; } - } - } - - if (n_consecutive == 0) { - /* There were no old requests. Look for an i/o request at the - lowest offset in the array (we ignore the high 32 bits of the - offset in these heuristics) */ - - lowest_offset = ULINT_MAX; - - for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, - i + segment * n); - - if (slot->reserved && slot->offset < lowest_offset) { + /* Look for an i/o request at the lowest offset in the array + * (we ignore the high 32 bits of the offset) */ + if (slot->offset < lowest_offset) { /* Found an i/o request */ - consecutive_ios[0] = slot; - - n_consecutive = 1; - + lowest_request = slot; lowest_offset = slot->offset; } } } - if (n_consecutive == 0) { + if (!lowest_request && !oldest_request) { /* No i/o requested at the moment */ goto wait_for_io; } - slot = consecutive_ios[0]; + if (oldest_request) { + slot = oldest_request; + } else { + slot = lowest_request; + } + consecutive_ios[0] = slot; + n_consecutive = 1; /* Check if there are several consecutive blocks to read or write */ consecutive_loop: for (i = 0; i < n; i++) { - slot2 = os_aio_array_get_nth_slot(array, i + segment * n); + slot2 = os_aio_array_get_nth_slot(array, i); if (slot2->reserved && slot2 != slot && slot2->offset == slot->offset + slot->len @@ -3822,7 +3877,8 @@ consecutive_loop: && slot->offset + slot->len > slot->offset && slot2->offset_high == slot->offset_high && slot2->type == slot->type - && slot2->file == slot->file) { + && slot2->file == slot->file + && slot2->status == OS_AIO_NOT_ISSUED) { /* Found a consecutive i/o request */ @@ -3851,6 +3907,8 @@ consecutive_loop: for (i = 0; i < n_consecutive; i++) { total_len += consecutive_ios[i]->len; + ut_a(consecutive_ios[i]->status == OS_AIO_NOT_ISSUED); + consecutive_ios[i]->status = OS_AIO_ISSUED; } if (n_consecutive == 1) { @@ -3858,7 +3916,14 @@ consecutive_loop: combined_buf = slot->buf; combined_buf2 = NULL; } else { - combined_buf2 = ut_malloc(total_len + UNIV_PAGE_SIZE); + if ((total_len + UNIV_PAGE_SIZE) > os_aio_thread_buffer_size[global_segment]) { + if (os_aio_thread_buffer[global_segment]) + ut_free(os_aio_thread_buffer[global_segment]); + + os_aio_thread_buffer[global_segment] = ut_malloc(total_len + UNIV_PAGE_SIZE); + os_aio_thread_buffer_size[global_segment] = total_len + UNIV_PAGE_SIZE; + } + combined_buf2 = os_aio_thread_buffer[global_segment]; ut_a(combined_buf2); @@ -3869,6 +3934,9 @@ consecutive_loop: this assumes that there is just one i/o-handler thread serving a single segment of slots! */ + ut_a(slot->reserved); + ut_a(slot->status == OS_AIO_ISSUED); + os_mutex_exit(array->mutex); if (slot->type == OS_FILE_WRITE && n_consecutive > 1) { @@ -3924,16 +3992,13 @@ consecutive_loop: } } - if (combined_buf2) { - ut_free(combined_buf2); - } - os_mutex_enter(array->mutex); /* Mark the i/os done in slots */ for (i = 0; i < n_consecutive; i++) { - consecutive_ios[i]->io_already_done = TRUE; + ut_a(consecutive_ios[i]->status == OS_AIO_ISSUED); + consecutive_ios[i]->status = OS_AIO_DONE; } /* We return the messages for the first slot now, and if there were @@ -3943,6 +4008,8 @@ consecutive_loop: slot_io_done: ut_a(slot->reserved); + ut_a(slot->status == OS_AIO_DONE); + slot->status = OS_AIO_CLAIMED; *message1 = slot->message1; *message2 = slot->message2; |