summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2016-09-19 12:31:20 +0000
committerVladislav Vaintroub <wlad@mariadb.com>2016-09-19 12:31:20 +0000
commit04b6b0732ddaac933c39353fcb4e1a9f42c2c59f (patch)
tree092c0265fed7fbd4443749c08808b24435b23ff2
parent62ed88017c8a137bd2b57cb55de1badf36f3906a (diff)
downloadmariadb-git-bb-10.2-jan-wlad.tar.gz
Windows : completion port based asynchronous IO.bb-10.2-jan-wlad
-rw-r--r--storage/innobase/os/os0file.cc399
1 files changed, 173 insertions, 226 deletions
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index dfe5021da8c..3cd92282aa0 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -162,12 +162,10 @@ static ulint os_innodb_umask = 0;
#ifndef ECANCELED
#define ECANCELED 125
#endif
-
-/* On Windows when using native AIO the number of AIO requests
-that a thread can handle at a given time is limited to 32
-i.e.: SRV_N_PENDING_IOS_PER_THREAD */
-#define SRV_N_PENDING_IOS_PER_THREAD OS_AIO_N_PENDING_IOS_PER_THREAD
-
+static HANDLE completion_port;
+static HANDLE read_completion_port;
+static DWORD fls_sync_io = FLS_OUT_OF_INDEXES;
+#define IOCP_SHUTDOWN_KEY (ULONG_PTR)-1
#endif /* _WIN32 */
#ifndef UNIV_HOTBACKUP
@@ -246,8 +244,19 @@ mysql_pfs_key_t innodb_log_file_key;
mysql_pfs_key_t innodb_temp_file_key;
#endif /* UNIV_PFS_IO */
+class AIO;
+
/** The asynchronous I/O context */
struct Slot {
+
+#ifdef WIN_ASYNC_IO
+ /** Windows control block for the aio request
+ must be at the very start of Slot, so we can
+ cast Slot* to OVERLAPPED*
+ */
+ OVERLAPPED control;
+#endif
+
/** index of the slot in the aio array */
uint16_t pos;
@@ -297,11 +306,6 @@ struct Slot {
dberr_t err;
#ifdef WIN_ASYNC_IO
- /** handle object we need in the OVERLAPPED struct */
- HANDLE handle;
-
- /** Windows control block for the aio request */
- OVERLAPPED control;
/** bytes written/read */
DWORD n_bytes;
@@ -309,6 +313,8 @@ struct Slot {
/** length of the block to read or write */
DWORD len;
+ /** aio array containing this slot */
+ AIO *array;
#elif defined(LINUX_NATIVE_AIO)
/** Linux control block for aio */
struct iocb control;
@@ -510,31 +516,11 @@ public:
#endif /* LINUX_NATIVE_AIO */
#ifdef WIN_ASYNC_IO
- /** Wakes up all async i/o threads in the array in Windows async I/O at
- shutdown. */
- void signal()
- {
- for (ulint i = 0; i < m_slots.size(); ++i) {
- SetEvent(m_slots[i].handle);
- }
- }
-
+
/** Wake up all AIO threads in Windows native aio */
- static void wake_at_shutdown()
- {
- s_reads->signal();
-
- if (s_writes != NULL) {
- s_writes->signal();
- }
-
- if (s_ibuf != NULL) {
- s_ibuf->signal();
- }
-
- if (s_log != NULL) {
- s_log->signal();
- }
+ static void wake_at_shutdown() {
+ PostQueuedCompletionStatus(completion_port, 0, IOCP_SHUTDOWN_KEY, NULL);
+ PostQueuedCompletionStatus(read_completion_port, 0, IOCP_SHUTDOWN_KEY, NULL);
}
#endif /* WIN_ASYNC_IO */
@@ -545,33 +531,7 @@ public:
threads are not left sleeping! */
static void simulated_put_read_threads_to_sleep();
- /** The non asynchronous IO array.
- @return the synchronous AIO array instance. */
- static AIO* sync_array()
- MY_ATTRIBUTE((warn_unused_result))
- {
- return(s_sync);
- }
-
- /**
- Get the AIO handles for a segment.
- @param[in] segment The local segment.
- @return the handles for the segment. */
- HANDLE* handles(ulint segment)
- MY_ATTRIBUTE((warn_unused_result))
- {
- ut_ad(segment < m_handles->size() / slots_per_segment());
-
- return(&(*m_handles)[segment * slots_per_segment()]);
- }
-
- /** @return true if no slots are reserved */
- bool is_empty() const
- MY_ATTRIBUTE((warn_unused_result))
- {
- ut_ad(is_mutex_owned());
- return(m_n_reserved == 0);
- }
+
#endif /* _WIN32 */
/** Create an instance using new(std::nothrow)
@@ -728,14 +688,6 @@ private:
the ibuf segment */
ulint m_n_reserved;
-#ifdef _WIN32
- typedef std::vector<HANDLE, ut_allocator<HANDLE> > Handles;
-
- /** Pointer to an array of OS native event handles where
- we copied the handles from slots, in the same order. This
- can be used in WaitForMultipleObjects; used only in Windows */
- Handles* m_handles;
-#endif /* _WIN32 */
#if defined(LINUX_NATIVE_AIO)
typedef std::vector<io_event> IOEvents;
@@ -1645,11 +1597,7 @@ AIO::release(Slot* slot)
os_event_set(m_is_empty);
}
-#ifdef WIN_ASYNC_IO
-
- ResetEvent(slot->handle);
-
-#elif defined(LINUX_NATIVE_AIO)
+#if defined(LINUX_NATIVE_AIO)
if (srv_use_native_aio) {
memset(&slot->control, 0x0, sizeof(slot->control));
@@ -4144,6 +4092,57 @@ os_aio_simulated_put_read_threads_to_sleep()
#include <WinIoCtl.h>
+/*
+Windows : Handling synchronous IO on files opened asynchronously.
+
+If file is opened for asynchronous IO (FILE_FLAG_OVERLAPPED) and also bound to
+a completion port, then every IO on this file would normally be enqueued to the
+completion port. Sometimes however we would like to do a synchronous IO. This is
+possible if we initialitze have overlapped.hEvent with a valid event and set its
+lowest order bit to 1 (see MSDN ReadFile and WriteFile description for more info)
+
+We'll create this special event once for each thread and store in thread local
+storage.
+*/
+
+
+static void __stdcall win_free_syncio_event(void *data) {
+ if (data) {
+ CloseHandle((HANDLE)data);
+ }
+}
+
+
+/*
+Initialize tls index.for event handle used for synchronized IO on files that
+might be opened with FILE_FLAG_OVERLAPPED.
+*/
+static void win_init_syncio_event() {
+ fls_sync_io = FlsAlloc(win_free_syncio_event);
+ ut_a(fls_sync_io != FLS_OUT_OF_INDEXES);
+}
+
+
+/*
+Retrieve per-thread event for doing synchronous io on asyncronously opened files
+*/
+static HANDLE win_get_syncio_event()
+{
+ HANDLE h;
+
+ h = (HANDLE)FlsGetValue(fls_sync_io);
+ if (h) {
+ return h;
+ }
+ h = CreateEventA(NULL, FALSE, FALSE, NULL);
+ ut_a(h);
+ /* Set low-order bit to keeps I/O completion from being queued */
+ h = (HANDLE)((uintptr_t)h | 1);
+ FlsSetValue(fls_sync_io, h);
+ return h;
+}
+
+
/** Do the read/write
@param[in] request The IO context and type
@return the number of bytes read/written or negative value on error */
@@ -4154,6 +4153,7 @@ SyncFileIO::execute(const IORequest& request)
memset(&seek, 0x0, sizeof(seek));
+ seek.hEvent = win_get_syncio_event();
seek.Offset = (DWORD) m_offset & 0xFFFFFFFF;
seek.OffsetHigh = (DWORD) (m_offset >> 32);
@@ -4169,6 +4169,10 @@ SyncFileIO::execute(const IORequest& request)
ret = WriteFile(m_fh, m_buf,
static_cast<DWORD>(m_n), &n_bytes, &seek);
}
+ if (!ret && (GetLastError() == ERROR_IO_PENDING)) {
+ /* Wait for async io to complete */
+ ret = GetOverlappedResult(m_fh, &seek, &n_bytes, TRUE);
+ }
return(ret ? static_cast<ssize_t>(n_bytes) : -1);
}
@@ -4180,7 +4184,7 @@ ssize_t
SyncFileIO::execute(Slot* slot)
{
BOOL ret;
-
+ slot->control.hEvent = win_get_syncio_event();
if (slot->type.is_read()) {
ret = ReadFile(
@@ -4195,10 +4199,34 @@ SyncFileIO::execute(Slot* slot)
&slot->n_bytes, &slot->control);
}
-
+ if (!ret && (GetLastError() == ERROR_IO_PENDING)) {
+ /* Wait for async io to complete */
+ ret = GetOverlappedResult(slot->file, &slot->control, &slot->n_bytes, TRUE);
+ }
return(ret ? slot->n_bytes : -1);
}
+/* Startup/shutdown */
+
+struct WinIoInit
+{
+ WinIoInit() {
+ completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ read_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); ut_a(completion_port && read_completion_port);
+ fls_sync_io = FlsAlloc(win_free_syncio_event);
+ ut_a(fls_sync_io != FLS_OUT_OF_INDEXES);
+ }
+
+ ~WinIoInit() {
+ CloseHandle(completion_port);
+ CloseHandle(read_completion_port);
+ FlsFree(fls_sync_io);
+ }
+};
+
+/* Ensures proper initialization and shutdown */
+static WinIoInit win_io_init;
+
/** Check if the file system supports sparse files.
@param[in] name File name
@return true if the file system supports sparse files */
@@ -4922,6 +4950,10 @@ os_file_create_func(
*success = true;
+ if (srv_use_native_aio && ((attributes & FILE_FLAG_OVERLAPPED) != 0)) {
+ /* Bind the file handle to completion port */
+ ut_a(CreateIoCompletionPort(file, completion_port, 0, 0));
+ }
DWORD temp;
/* This is a best effort use case, if it fails then
@@ -6447,8 +6479,6 @@ AIO::AIO(
# ifdef LINUX_NATIVE_AIO
,m_aio_ctx(),
m_events(m_slots.size())
-# elif defined(_WIN32)
- ,m_handles()
# endif /* LINUX_NATIVE_AIO */
{
ut_a(n > 0);
@@ -6480,13 +6510,7 @@ AIO::init_slots()
#ifdef WIN_ASYNC_IO
- slot.handle = CreateEvent(NULL, TRUE, FALSE, NULL);
-
- OVERLAPPED* over = &slot.control;
-
- over->hEvent = slot.handle;
-
- (*m_handles)[i] = over->hEvent;
+ slot.array = this;
#elif defined(LINUX_NATIVE_AIO)
@@ -6555,11 +6579,6 @@ AIO::init()
{
ut_a(!m_slots.empty());
-#ifdef _WIN32
- ut_a(m_handles == NULL);
-
- m_handles = UT_NEW_NOKEY(Handles(m_slots.size()));
-#endif /* _WIN32 */
if (srv_use_native_aio) {
#ifdef LINUX_NATIVE_AIO
@@ -6613,16 +6632,6 @@ AIO::create(
/** AIO destructor */
AIO::~AIO()
{
-#ifdef WIN_ASYNC_IO
- for (ulint i = 0; i < m_slots.size(); ++i) {
- CloseHandle(m_slots[i].handle);
- }
-#endif /* WIN_ASYNC_IO */
-
-#ifdef _WIN32
- UT_DELETE(m_handles);
-#endif /* _WIN32 */
-
mutex_destroy(&m_mutex);
os_event_destroy(m_not_full);
@@ -6806,11 +6815,6 @@ os_aio_init(
/* Maximum number of pending aio operations allowed per segment */
ulint limit = 8 * OS_AIO_N_PENDING_IOS_PER_THREAD;
-#ifdef _WIN32
- if (srv_use_native_aio) {
- limit = SRV_N_PENDING_IOS_PER_THREAD;
- }
-#endif /* _WIN32 */
ut_a(block_cache == NULL);
@@ -7145,8 +7149,6 @@ AIO::reserve_slot(
control = &slot->control;
control->Offset = (DWORD) offset & 0xFFFFFFFF;
control->OffsetHigh = (DWORD) (offset >> 32);
-
- ResetEvent(slot->handle);
}
#elif defined(LINUX_NATIVE_AIO)
@@ -7331,6 +7333,10 @@ therefore no other thread is allowed to do the freeing!
@param[out] m2 callback message
@param[out] type OS_FILE_WRITE or ..._READ
@return DB_SUCCESS or error code */
+
+#define READ_SEGMENT(s) (s < srv_n_read_io_threads)
+#define WRITE_SEGMENT(s) !READ_SEGMENT(s)
+
static
dberr_t
os_aio_windows_handler(
@@ -7340,86 +7346,74 @@ os_aio_windows_handler(
void** m2,
IORequest* type)
{
- Slot* slot;
+ Slot* slot= 0;
dberr_t err;
- AIO* array;
- ulint orig_seg = segment;
- if (segment == ULINT_UNDEFINED) {
- segment = 0;
- array = AIO::sync_array();
- } else {
- segment = AIO::get_array_and_local_segment(&array, segment);
- }
+ BOOL ret;
+ ULONG_PTR key;
+
+ ut_a(segment != ULINT_UNDEFINED);
/* NOTE! We only access constant fields in os_aio_array. Therefore
we do not have to acquire the protecting mutex yet */
ut_ad(os_aio_validate_skip());
- if (array == AIO::sync_array()) {
+ HANDLE port = READ_SEGMENT(segment) ? read_completion_port : completion_port;
+ for (;;) {
+ DWORD len;
+ ret = GetQueuedCompletionStatus(port, &len, &key,
+ (OVERLAPPED **)&slot, INFINITE);
- WaitForSingleObject(array->at(pos)->handle, INFINITE);
+ /* If shutdown key was received, repost the shutdown message and exit */
+ if (ret && key == IOCP_SHUTDOWN_KEY) {
+ PostQueuedCompletionStatus(port, 0, key, NULL);
+ *m1 = NULL;
+ *m2 = NULL;
+ return (DB_SUCCESS);
+ }
- } else {
- if (orig_seg != ULINT_UNDEFINED) {
- srv_set_io_thread_op_info(orig_seg, "wait Windows aio");
- }
-
- DWORD handle_count = (DWORD)array->slots_per_segment();
- HANDLE *handle_array = array->handles(segment);
- pos = WaitForMultipleObjects(
- handle_count,
- handle_array,
- FALSE, INFINITE);
- if (pos == WAIT_FAILED) {
- DWORD last_error = GetLastError();
- ib::error()
- << "WaitForMultipleObjects() failed with error "
- << last_error;
- ut_error;
+ ut_a(slot);
+ if (!ret) {
+ /* IO failed */
+ break;
}
- }
- array->acquire();
+ slot->n_bytes= len;
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS
- && array->is_empty()
- && !buf_page_cleaner_is_active) {
+ if (WRITE_SEGMENT(segment) && slot->type.is_read()) {
+ /*
+ Redirect read completions to the dedicated completion port
+ and thread. We need to split read and write threads. If we do not
+ do that, and just allow all io threads process all IO, it is possible
+ to get stuck in a deadlock in buffer pool code,
- *m1 = NULL;
- *m2 = NULL;
+ Currently, the problem is solved this way - "write io" threads
+ always get all completion notifications, from both async reads and
+ writes. Write completion is handled in the same thread that gets it.
+ Read completion is forwarded via PostQueueCompletionStatus())
+ to the second completion port dedicated solely to reads. One of the
+ "read io" threads waiting on this port will finally handle the IO.
- array->release();
-
- return(DB_SUCCESS);
+ Forwarding IO completion this way costs a context switch , and this
+ seems tolerable since asynchronous reads are by far less frequent.
+ */
+ ut_a(PostQueuedCompletionStatus(read_completion_port, len, key, &slot->control));
+ }
+ else {
+ break;
+ }
}
- ulint n = array->slots_per_segment();
-
- ut_a(pos >= WAIT_OBJECT_0 && pos <= WAIT_OBJECT_0 + n);
-
- slot = array->at(pos + segment * n);
-
ut_a(slot->is_reserved);
- if (orig_seg != ULINT_UNDEFINED) {
- srv_set_io_thread_op_info(
- orig_seg, "get windows aio return value");
- }
-
- BOOL ret;
-
- ret = GetOverlappedResult(
- slot->file, &slot->control, &slot->n_bytes, TRUE);
-
*m1 = slot->m1;
*m2 = slot->m2;
*type = slot->type;
- BOOL retry = FALSE;
+ bool retry = false;
if (ret && slot->n_bytes == slot->len) {
@@ -7434,11 +7428,9 @@ os_aio_windows_handler(
err = DB_IO_ERROR;
}
- array->release();
if (retry) {
- /* Retry failed read/write operation synchronously.
- No need to hold array->m_mutex. */
+ /* Retry failed read/write operation synchronously. */
#ifdef UNIV_PFS_IO
/* This read/write does not go through os_file_read
@@ -7460,22 +7452,6 @@ os_aio_windows_handler(
register_pfs_file_io_end(locker, slot->len);
#endif /* UNIV_PFS_IO */
- if (n_bytes < 0 && GetLastError() == ERROR_IO_PENDING) {
- /* AIO was queued successfully!
- We want a synchronous I/O operation on a
- file where we also use async I/O: in Windows
- we must use the same wait mechanism as for
- async I/O */
-
- BOOL ret;
-
- ret = GetOverlappedResult(
- slot->file, &slot->control, &slot->n_bytes,
- TRUE);
-
- n_bytes = ret ? slot->n_bytes : -1;
- }
-
err = (n_bytes == slot->len) ? DB_SUCCESS : DB_IO_ERROR;
}
@@ -7483,8 +7459,15 @@ os_aio_windows_handler(
err = AIOHandler::post_io_processing(slot);
}
- array->release_with_mutex(slot);
+ ut_a(slot->array);
+ slot->array->release_with_mutex(slot);
+ if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS
+ && !buf_page_cleaner_is_active
+ && os_aio_all_slots_free()) {
+ /* Last IO, wakeup other io threads */
+ AIO::wake_at_shutdown();
+ }
return(err);
}
#endif /* WIN_ASYNC_IO */
@@ -7527,7 +7510,6 @@ os_aio_func(
actual page size does not decrease. */
{
#ifdef WIN_ASYNC_IO
- void* buffer = NULL;
BOOL ret = TRUE;
#endif /* WIN_ASYNC_IO */
@@ -7543,24 +7525,7 @@ os_aio_func(
DBUG_EXECUTE_IF("ib_os_aio_func_io_failure_28",
mode = OS_AIO_SYNC; os_has_said_disk_full = FALSE;);
- if (mode == OS_AIO_SYNC
-#ifdef WIN_ASYNC_IO
- && !srv_use_native_aio
-#endif /* WIN_ASYNC_IO */
- ) {
- /* This is actually an ordinary synchronous read or write:
- no need to use an i/o-handler thread. NOTE that if we use
- Windows async i/o, Windows does not allow us to use
- ordinary synchronous os_file_read etc. on the same file,
- therefore we have built a special mechanism for synchronous
- wait in the Windows case.
- Also note that the Performance Schema instrumentation has
- been performed by current os_aio_func()'s wrapper function
- pfs_os_aio_func(). So we would no longer need to call
- Performance Schema instrumented os_file_read() and
- os_file_write(). Instead, we should use os_file_read_func()
- and os_file_write_func() */
-
+ if (mode == OS_AIO_SYNC) {
if (type.is_read()) {
return(os_file_read_func(type, file, buf, offset, n));
}
@@ -7627,32 +7592,14 @@ try_again:
}
#ifdef WIN_ASYNC_IO
- if (srv_use_native_aio) {
- if ((ret && slot->len == slot->n_bytes)
- || (!ret && GetLastError() == ERROR_IO_PENDING)) {
- /* aio was queued successfully! */
-
- if (mode == OS_AIO_SYNC) {
- IORequest dummy_type;
- void* dummy_mess2;
- struct fil_node_t* dummy_mess1;
-
- /* We want a synchronous i/o operation on a
- file where we also use async i/o: in Windows
- we must use the same wait mechanism as for
- async i/o */
-
- return(os_aio_windows_handler(
- ULINT_UNDEFINED, slot->pos,
- &dummy_mess1, &dummy_mess2,
- &dummy_type));
- }
+ if ((ret && slot->len == slot->n_bytes)
+ || (!ret && GetLastError() == ERROR_IO_PENDING)) {
+ /* aio completed or was queued successfully! */
+ return(DB_SUCCESS);
+ }
- return(DB_SUCCESS);
- }
+ goto err_exit;
- goto err_exit;
- }
#endif /* WIN_ASYNC_IO */
/* AIO request was queued successfully! */