summaryrefslogtreecommitdiff
path: root/storage/xtradb/os/os0file.c
diff options
context:
space:
mode:
Diffstat (limited to 'storage/xtradb/os/os0file.c')
-rw-r--r--storage/xtradb/os/os0file.c195
1 files changed, 131 insertions, 64 deletions
diff --git a/storage/xtradb/os/os0file.c b/storage/xtradb/os/os0file.c
index 2d130899622..efef608b88f 100644
--- a/storage/xtradb/os/os0file.c
+++ b/storage/xtradb/os/os0file.c
@@ -73,6 +73,28 @@ UNIV_INTERN ibool os_aio_use_native_aio = FALSE;
UNIV_INTERN ibool os_aio_print_debug = FALSE;
+/* State for the state of an IO request in simulated AIO.
+ Protocol for simulated aio:
+ client requests IO: find slot with reserved = FALSE. Add entry with
+ status = OS_AIO_NOT_ISSUED.
+ IO thread wakes: find adjacent slots with reserved = TRUE and status =
+ OS_AIO_NOT_ISSUED. Change status for slots to
+ OS_AIO_ISSUED.
+ IO operation completes: set status for slots to OS_AIO_DONE. set status
+ for the first slot to OS_AIO_CLAIMED and return
+ result for that slot.
+ When there are multiple read and write threads, they all compete to execute
+ the requests in the array (os_aio_array_t). This avoids the need to load
+ balance requests at the time the request is made at the cost of waking all
+ threads when a request is available.
+*/
+typedef enum {
+ OS_AIO_NOT_ISSUED, /* Available to be processed by an IO thread. */
+ OS_AIO_ISSUED, /* Being processed by an IO thread. */
+ OS_AIO_DONE, /* Request processed. */
+ OS_AIO_CLAIMED /* Result being returned to client. */
+} os_aio_status;
+
/* The aio array slot structure */
typedef struct os_aio_slot_struct os_aio_slot_t;
@@ -81,6 +103,8 @@ struct os_aio_slot_struct{
ulint pos; /* index of the slot in the aio
array */
ibool reserved; /* TRUE if this slot is reserved */
+ os_aio_status status; /* Status for current request. Valid when reserved
+ is TRUE. Used only in simulated aio. */
time_t reservation_time;/* time when reserved */
ulint len; /* length of the block to read or
write */
@@ -91,11 +115,11 @@ struct os_aio_slot_struct{
ulint offset_high; /* 32 high bits of file offset */
os_file_t file; /* file where to read or write */
const char* name; /* file name or path */
- ibool io_already_done;/* used only in simulated aio:
- TRUE if the physical i/o already
- made and only the slot message
- needs to be passed to the caller
- of os_aio_simulated_handle */
+// ibool io_already_done;/* used only in simulated aio:
+// TRUE if the physical i/o already
+// made and only the slot message
+// needs to be passed to the caller
+// of os_aio_simulated_handle */
fil_node_t* message1; /* message which is given by the */
void* message2; /* the requester of an aio operation
and which can be used to identify
@@ -141,6 +165,13 @@ struct os_aio_array_struct{
/* Array of events used in simulated aio */
static os_event_t* os_aio_segment_wait_events = NULL;
+/* Number for the first global segment for reading. */
+const ulint os_aio_first_read_segment = 2;
+
+/* Number for the first global segment for writing. Set to
+2 + os_aio_read_write_threads. */
+ulint os_aio_first_write_segment = 0;
+
/* The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These
are NULL when the module has not yet been initialized. */
static os_aio_array_t* os_aio_read_array = NULL;
@@ -149,11 +180,17 @@ static os_aio_array_t* os_aio_ibuf_array = NULL;
static os_aio_array_t* os_aio_log_array = NULL;
static os_aio_array_t* os_aio_sync_array = NULL;
+/* Per thread buffer used for merged IO requests. Used by
+os_aio_simulated_handle so that a buffer doesn't have to be allocated
+for each request. */
+static char* os_aio_thread_buffer[SRV_MAX_N_IO_THREADS];
+static ulint os_aio_thread_buffer_size[SRV_MAX_N_IO_THREADS];
+
static ulint os_aio_n_segments = ULINT_UNDEFINED;
/* If the following is TRUE, read i/o handler threads try to
wait until a batch of new read requests have been posted */
-static ibool os_aio_recommend_sleep_for_read_threads = FALSE;
+static volatile ibool os_aio_recommend_sleep_for_read_threads = FALSE;
UNIV_INTERN ulint os_n_file_reads = 0;
UNIV_INTERN ulint os_bytes_read_since_printout = 0;
@@ -2956,6 +2993,8 @@ os_aio_init(
for (i = 0; i < n_segments; i++) {
srv_set_io_thread_op_info(i, "not started yet");
+ os_aio_thread_buffer[i] = 0;
+ os_aio_thread_buffer_size[i] = 0;
}
n_per_seg = n / n_segments;
@@ -2964,6 +3003,7 @@ os_aio_init(
/* fprintf(stderr, "Array n per seg %lu\n", n_per_seg); */
+ os_aio_first_write_segment = os_aio_first_read_segment + n_read_threads;
os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1);
srv_io_thread_function[0] = "insert buffer thread";
@@ -2972,14 +3012,14 @@ os_aio_init(
srv_io_thread_function[1] = "log thread";
- os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg,
+ os_aio_read_array = os_aio_array_create(n_per_seg,
n_read_segs);
for (i = 2; i < 2 + n_read_segs; i++) {
ut_a(i < SRV_MAX_N_IO_THREADS);
srv_io_thread_function[i] = "read thread";
}
- os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg,
+ os_aio_write_array = os_aio_array_create(n_per_seg,
n_write_segs);
for (i = 2 + n_read_segs; i < n_segments; i++) {
ut_a(i < SRV_MAX_N_IO_THREADS);
@@ -3225,7 +3265,8 @@ loop:
slot->buf = buf;
slot->offset = offset;
slot->offset_high = offset_high;
- slot->io_already_done = FALSE;
+// slot->io_already_done = FALSE;
+ slot->status = OS_AIO_NOT_ISSUED;
#ifdef WIN_ASYNC_IO
control = &(slot->control);
@@ -3256,6 +3297,7 @@ os_aio_array_free_slot(
ut_ad(slot->reserved);
slot->reserved = FALSE;
+ slot->status = OS_AIO_NOT_ISSUED;
array->n_reserved--;
@@ -3292,16 +3334,18 @@ os_aio_simulated_wake_handler_thread(
segment = os_aio_get_array_and_local_segment(&array, global_segment);
- n = array->n_slots / array->n_segments;
+ n = array->n_slots;
/* Look through n slots after the segment * n'th slot */
os_mutex_enter(array->mutex);
for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array, i + segment * n);
+ slot = os_aio_array_get_nth_slot(array, i);
- if (slot->reserved) {
+ if (slot->reserved &&
+ (slot->status == OS_AIO_NOT_ISSUED ||
+ slot->status == OS_AIO_DONE)) {
/* Found an i/o request */
break;
@@ -3311,7 +3355,25 @@ os_aio_simulated_wake_handler_thread(
os_mutex_exit(array->mutex);
if (i < n) {
- os_event_set(os_aio_segment_wait_events[global_segment]);
+ if (array == os_aio_ibuf_array) {
+ os_event_set(os_aio_segment_wait_events[0]);
+
+ } else if (array == os_aio_log_array) {
+ os_event_set(os_aio_segment_wait_events[1]);
+
+ } else if (array == os_aio_read_array) {
+ ulint x;
+ for (x = os_aio_first_read_segment; x < os_aio_first_write_segment; x++)
+ os_event_set(os_aio_segment_wait_events[x]);
+
+ } else if (array == os_aio_write_array) {
+ ulint x;
+ for (x = os_aio_first_write_segment; x < os_aio_n_segments; x++)
+ os_event_set(os_aio_segment_wait_events[x]);
+
+ } else {
+ ut_a(0);
+ }
}
}
@@ -3322,8 +3384,6 @@ void
os_aio_simulated_wake_handler_threads(void)
/*=======================================*/
{
- ulint i;
-
if (os_aio_use_native_aio) {
/* We do not use simulated aio: do nothing */
@@ -3332,9 +3392,10 @@ os_aio_simulated_wake_handler_threads(void)
os_aio_recommend_sleep_for_read_threads = FALSE;
- for (i = 0; i < os_aio_n_segments; i++) {
- os_aio_simulated_wake_handler_thread(i);
- }
+ os_aio_simulated_wake_handler_thread(0);
+ os_aio_simulated_wake_handler_thread(1);
+ os_aio_simulated_wake_handler_thread(os_aio_first_read_segment);
+ os_aio_simulated_wake_handler_thread(os_aio_first_write_segment);
}
/**************************************************************************
@@ -3606,7 +3667,7 @@ os_aio_windows_handle(
ut_ad(os_aio_validate());
ut_ad(segment < array->n_segments);
- n = array->n_slots / array->n_segments;
+ n = array->n_slots;
if (array == os_aio_sync_array) {
os_event_wait(os_aio_array_get_nth_slot(array, pos)->event);
@@ -3615,12 +3676,12 @@ os_aio_windows_handle(
srv_set_io_thread_op_info(orig_seg, "wait Windows aio");
i = os_event_wait_multiple(n,
(array->native_events)
- + segment * n);
+ );
}
os_mutex_enter(array->mutex);
- slot = os_aio_array_get_nth_slot(array, i + segment * n);
+ slot = os_aio_array_get_nth_slot(array, i);
ut_a(slot->reserved);
@@ -3685,10 +3746,13 @@ os_aio_simulated_handle(
os_aio_slot_t* slot;
os_aio_slot_t* slot2;
os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE];
+ os_aio_slot_t* lowest_request;
+ os_aio_slot_t* oldest_request;
ulint n_consecutive;
ulint total_len;
ulint offs;
ulint lowest_offset;
+ ulint oldest_offset;
ulint biggest_age;
ulint age;
byte* combined_buf;
@@ -3696,6 +3760,7 @@ os_aio_simulated_handle(
ibool ret;
ulint n;
ulint i;
+ time_t now;
segment = os_aio_get_array_and_local_segment(&array, global_segment);
@@ -3708,7 +3773,7 @@ restart:
ut_ad(os_aio_validate());
ut_ad(segment < array->n_segments);
- n = array->n_slots / array->n_segments;
+ n = array->n_slots;
/* Look through n slots after the segment * n'th slot */
@@ -3730,9 +3795,9 @@ restart:
done */
for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array, i + segment * n);
+ slot = os_aio_array_get_nth_slot(array, i);
- if (slot->reserved && slot->io_already_done) {
+ if (slot->reserved && slot->status == OS_AIO_DONE) {
if (os_aio_print_debug) {
fprintf(stderr,
@@ -3754,67 +3819,57 @@ restart:
then pick the one at the lowest offset. */
biggest_age = 0;
- lowest_offset = ULINT_MAX;
+ now = time(NULL);
+ oldest_request = lowest_request = NULL;
+ oldest_offset = lowest_offset = ULINT_MAX;
+ /* Find the oldest request and the request with the smallest offset */
for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array, i + segment * n);
+ slot = os_aio_array_get_nth_slot(array, i);
- if (slot->reserved) {
- age = (ulint)difftime(time(NULL),
- slot->reservation_time);
+ if (slot->reserved && slot->status == OS_AIO_NOT_ISSUED) {
+ age = (ulint)difftime(now, slot->reservation_time);
if ((age >= 2 && age > biggest_age)
|| (age >= 2 && age == biggest_age
- && slot->offset < lowest_offset)) {
+ && slot->offset < oldest_offset)) {
/* Found an i/o request */
- consecutive_ios[0] = slot;
-
- n_consecutive = 1;
-
biggest_age = age;
- lowest_offset = slot->offset;
+ oldest_request = slot;
+ oldest_offset = slot->offset;
}
- }
- }
-
- if (n_consecutive == 0) {
- /* There were no old requests. Look for an i/o request at the
- lowest offset in the array (we ignore the high 32 bits of the
- offset in these heuristics) */
-
- lowest_offset = ULINT_MAX;
-
- for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array,
- i + segment * n);
-
- if (slot->reserved && slot->offset < lowest_offset) {
+ /* Look for an i/o request at the lowest offset in the array
+ * (we ignore the high 32 bits of the offset) */
+ if (slot->offset < lowest_offset) {
/* Found an i/o request */
- consecutive_ios[0] = slot;
-
- n_consecutive = 1;
-
+ lowest_request = slot;
lowest_offset = slot->offset;
}
}
}
- if (n_consecutive == 0) {
+ if (!lowest_request && !oldest_request) {
/* No i/o requested at the moment */
goto wait_for_io;
}
- slot = consecutive_ios[0];
+ if (oldest_request) {
+ slot = oldest_request;
+ } else {
+ slot = lowest_request;
+ }
+ consecutive_ios[0] = slot;
+ n_consecutive = 1;
/* Check if there are several consecutive blocks to read or write */
consecutive_loop:
for (i = 0; i < n; i++) {
- slot2 = os_aio_array_get_nth_slot(array, i + segment * n);
+ slot2 = os_aio_array_get_nth_slot(array, i);
if (slot2->reserved && slot2 != slot
&& slot2->offset == slot->offset + slot->len
@@ -3822,7 +3877,8 @@ consecutive_loop:
&& slot->offset + slot->len > slot->offset
&& slot2->offset_high == slot->offset_high
&& slot2->type == slot->type
- && slot2->file == slot->file) {
+ && slot2->file == slot->file
+ && slot2->status == OS_AIO_NOT_ISSUED) {
/* Found a consecutive i/o request */
@@ -3851,6 +3907,8 @@ consecutive_loop:
for (i = 0; i < n_consecutive; i++) {
total_len += consecutive_ios[i]->len;
+ ut_a(consecutive_ios[i]->status == OS_AIO_NOT_ISSUED);
+ consecutive_ios[i]->status = OS_AIO_ISSUED;
}
if (n_consecutive == 1) {
@@ -3858,7 +3916,14 @@ consecutive_loop:
combined_buf = slot->buf;
combined_buf2 = NULL;
} else {
- combined_buf2 = ut_malloc(total_len + UNIV_PAGE_SIZE);
+ if ((total_len + UNIV_PAGE_SIZE) > os_aio_thread_buffer_size[global_segment]) {
+ if (os_aio_thread_buffer[global_segment])
+ ut_free(os_aio_thread_buffer[global_segment]);
+
+ os_aio_thread_buffer[global_segment] = ut_malloc(total_len + UNIV_PAGE_SIZE);
+ os_aio_thread_buffer_size[global_segment] = total_len + UNIV_PAGE_SIZE;
+ }
+ combined_buf2 = os_aio_thread_buffer[global_segment];
ut_a(combined_buf2);
@@ -3869,6 +3934,9 @@ consecutive_loop:
this assumes that there is just one i/o-handler thread serving
a single segment of slots! */
+ ut_a(slot->reserved);
+ ut_a(slot->status == OS_AIO_ISSUED);
+
os_mutex_exit(array->mutex);
if (slot->type == OS_FILE_WRITE && n_consecutive > 1) {
@@ -3924,16 +3992,13 @@ consecutive_loop:
}
}
- if (combined_buf2) {
- ut_free(combined_buf2);
- }
-
os_mutex_enter(array->mutex);
/* Mark the i/os done in slots */
for (i = 0; i < n_consecutive; i++) {
- consecutive_ios[i]->io_already_done = TRUE;
+ ut_a(consecutive_ios[i]->status == OS_AIO_ISSUED);
+ consecutive_ios[i]->status = OS_AIO_DONE;
}
/* We return the messages for the first slot now, and if there were
@@ -3943,6 +4008,8 @@ consecutive_loop:
slot_io_done:
ut_a(slot->reserved);
+ ut_a(slot->status == OS_AIO_DONE);
+ slot->status = OS_AIO_CLAIMED;
*message1 = slot->message1;
*message2 = slot->message2;