diff options
Diffstat (limited to 'storage/innobase/os/os0file.cc')
-rw-r--r-- | storage/innobase/os/os0file.cc | 3634 |
1 files changed, 227 insertions, 3407 deletions
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index f9028122f8d..361e34549c7 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -53,6 +53,7 @@ Created 10/21/1995 Heikki Tuuri #include "os0thread.h" #include <vector> +#include <tpool_structs.h> #ifdef LINUX_NATIVE_AIO #include <libaio.h> @@ -78,601 +79,80 @@ Created 10/21/1995 Heikki Tuuri #endif -/** Insert buffer segment id */ -static const ulint IO_IBUF_SEGMENT = 0; - -/** Log segment id */ -static const ulint IO_LOG_SEGMENT = 1; - -/** Number of retries for partial I/O's */ -static const ulint NUM_RETRIES_ON_PARTIAL_IO = 10; - -/* This specifies the file permissions InnoDB uses when it creates files in -Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to -my_umask */ - -#ifndef _WIN32 -/** Umask for creating files */ -static ulint os_innodb_umask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; -#else -/** Umask for creating files */ -static ulint os_innodb_umask = 0; -static HANDLE data_completion_port; -static HANDLE log_completion_port; - -static DWORD fls_sync_io = FLS_OUT_OF_INDEXES; -#define IOCP_SHUTDOWN_KEY (ULONG_PTR)-1 -#endif /* _WIN32 */ - -/** In simulated aio, merge at most this many consecutive i/os */ -static const ulint OS_AIO_MERGE_N_CONSECUTIVE = 64; - -/** Flag indicating if the page_cleaner is in active state. */ -extern bool buf_page_cleaner_is_active; - -#ifdef WITH_INNODB_DISALLOW_WRITES -#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event) -#else -#define WAIT_ALLOW_WRITES() do { } while (0) -#endif /* WITH_INNODB_DISALLOW_WRITES */ - -/********************************************************************** - -InnoDB AIO Implementation: -========================= - -We support native AIO for Windows and Linux. For rest of the platforms -we simulate AIO by special IO-threads servicing the IO-requests. - -Simulated AIO: -============== - -On platforms where we 'simulate' AIO, the following is a rough explanation -of the high level design. -There are four io-threads (for ibuf, log, read, write). -All synchronous IO requests are serviced by the calling thread using -os_file_write/os_file_read. The Asynchronous requests are queued up -in an array (there are four such arrays) by the calling thread. -Later these requests are picked up by the IO-thread and are serviced -synchronously. - -Windows native AIO: -================== - -If srv_use_native_aio is not set then Windows follow the same -code as simulated AIO. If the flag is set then native AIO interface -is used. On windows, one of the limitation is that if a file is opened -for AIO no synchronous IO can be done on it. Therefore we have an -extra fifth array to queue up synchronous IO requests. -There are innodb_file_io_threads helper threads. These threads work -on the four arrays mentioned above in Simulated AIO. No thread is -required for the sync array. -If a synchronous IO request is made, it is first queued in the sync -array. Then the calling thread itself waits on the request, thus -making the call synchronous. -If an AIO request is made the calling thread not only queues it in the -array but also submits the requests. The helper thread then collects -the completed IO request and calls completion routine on it. - -Linux native AIO: -================= - -If we have libaio installed on the system and innodb_use_native_aio -is set to true we follow the code path of native AIO, otherwise we -do simulated AIO. -There are innodb_file_io_threads helper threads. These threads work -on the four arrays mentioned above in Simulated AIO. -If a synchronous IO request is made, it is handled by calling -os_file_write/os_file_read. -If an AIO request is made the calling thread not only queues it in the -array but also submits the requests. The helper thread then collects -the completed IO request and calls completion routine on it. - -**********************************************************************/ - - -#ifdef UNIV_PFS_IO -/* Keys to register InnoDB I/O with performance schema */ -mysql_pfs_key_t innodb_data_file_key; -mysql_pfs_key_t innodb_log_file_key; -mysql_pfs_key_t innodb_temp_file_key; -#endif /* UNIV_PFS_IO */ - -class AIO; - -/** The asynchronous I/O context */ -struct Slot { - -#ifdef WIN_ASYNC_IO - /** Windows control block for the aio request - must be at the very start of Slot, so we can - cast Slot* to OVERLAPPED* - */ - OVERLAPPED control; -#endif - - /** index of the slot in the aio array */ - uint16_t pos; - - /** true if this slot is reserved */ - bool is_reserved; - - /** time when reserved */ - time_t reservation_time; - - /** buffer used in i/o */ - byte* buf; - - /** Buffer pointer used for actual IO. We advance this - when partial IO is required and not buf */ - byte* ptr; - - /** OS_FILE_READ or OS_FILE_WRITE */ - IORequest type; - - /** file offset in bytes */ - os_offset_t offset; - - /** file where to read or write */ - pfs_os_file_t file; - - /** file name or path */ - const char* name; - - /** used only in simulated aio: true if the physical i/o - already made and only the slot message needs to be passed - to the caller of os_aio_simulated_handle */ - bool io_already_done; - - /*!< file block size */ - ulint file_block_size; - - /** The file node for which the IO is requested. */ - fil_node_t* m1; - - /** the requester of an aio operation and which can be used - to identify which pending aio operation was completed */ - void* m2; - - /** AIO completion status */ - dberr_t err; - -#ifdef WIN_ASYNC_IO - - /** bytes written/read */ - DWORD n_bytes; - - /** length of the block to read or write */ - DWORD len; - - /** aio array containing this slot */ - AIO *array; -#elif defined(LINUX_NATIVE_AIO) - /** Linux control block for aio */ - struct iocb control; - - /** AIO return code */ - int ret; - - /** bytes written/read. */ - ssize_t n_bytes; - - /** length of the block to read or write */ - ulint len; -#else - /** length of the block to read or write */ - ulint len; - - /** bytes written/read. */ - ulint n_bytes; -#endif /* WIN_ASYNC_IO */ - - /** Length of the block before it was compressed */ - uint32 original_len; - -}; - -/** The asynchronous i/o array structure */ -class AIO { +/* Per-IO operation environment*/ +class io_slots +{ +private: + tpool::cache<tpool::aiocb> m_cache; + tpool::task_group m_group; public: - /** Constructor - @param[in] id Latch ID - @param[in] n_slots Number of slots to configure - @param[in] segments Number of segments to configure */ - AIO(latch_id_t id, ulint n_slots, ulint segments); - - /** Destructor */ - ~AIO(); - - /** Initialize the instance - @return DB_SUCCESS or error code */ - dberr_t init(); - - /** Requests for a slot in the aio array. If no slot is available, waits - until not_full-event becomes signaled. - - @param[in] type IO context - @param[in,out] m1 message to be passed along with the AIO - operation - @param[in,out] m2 message to be passed along with the AIO - operation - @param[in] file file handle - @param[in] name name of the file or path as a null-terminated - string - @param[in,out] buf buffer where to read or from which to write - @param[in] offset file offset, where to read from or start writing - @param[in] len length of the block to read or write - @return pointer to slot */ - Slot* reserve_slot( - const IORequest& type, - fil_node_t* m1, - void* m2, - pfs_os_file_t file, - const char* name, - void* buf, - os_offset_t offset, - ulint len) - MY_ATTRIBUTE((warn_unused_result)); - - /** @return number of reserved slots */ - ulint pending_io_count() const; - - /** Returns a pointer to the nth slot in the aio array. - @param[in] index Index of the slot in the array - @return pointer to slot */ - const Slot* at(ulint i) const - MY_ATTRIBUTE((warn_unused_result)) + io_slots(int max_submitted_io, int max_callback_concurrency) : + m_cache(max_submitted_io), + m_group(max_callback_concurrency) { - ut_a(i < m_slots.size()); - - return(&m_slots[i]); } - - /** Non const version */ - Slot* at(ulint i) - MY_ATTRIBUTE((warn_unused_result)) + /* Get cached AIO control block */ + tpool::aiocb* acquire() { - ut_a(i < m_slots.size()); - - return(&m_slots[i]); + return m_cache.get(); } - - /** Frees a slot in the AIO array, assumes caller owns the mutex. - @param[in,out] slot Slot to release */ - void release(Slot* slot); - - /** Frees a slot in the AIO array, assumes caller doesn't own the mutex. - @param[in,out] slot Slot to release */ - void release_with_mutex(Slot* slot); - - /** Prints info about the aio array. - @param[in,out] file Where to print */ - void print(FILE* file); - - /** @return the number of slots per segment */ - ulint slots_per_segment() const - MY_ATTRIBUTE((warn_unused_result)) - { - return(m_slots.size() / m_n_segments); - } - - /** @return accessor for n_segments */ - ulint get_n_segments() const - MY_ATTRIBUTE((warn_unused_result)) + /* Release AIO control block back to cache */ + void release(tpool::aiocb* aiocb) { - return(m_n_segments); + m_cache.put(aiocb); } -#ifdef UNIV_DEBUG - /** @return true if the thread owns the mutex */ - bool is_mutex_owned() const - MY_ATTRIBUTE((warn_unused_result)) + bool contains(tpool::aiocb* aiocb) { - return(mutex_own(&m_mutex)); + return m_cache.contains(aiocb); } -#endif /* UNIV_DEBUG */ - /** Acquire the mutex */ - void acquire() const + /* Wait for completions of all AIO operations */ + void wait() { - mutex_enter(&m_mutex); + m_cache.wait(); } - /** Release the mutex */ - void release() const - { - mutex_exit(&m_mutex); - } - - /** Write out the state to the file/stream - @param[in, out] file File to write to */ - void to_file(FILE* file) const; - -#ifdef LINUX_NATIVE_AIO - /** Dispatch an AIO request to the kernel. - @param[in,out] slot an already reserved slot - @return true on success. */ - bool linux_dispatch(Slot* slot) - MY_ATTRIBUTE((warn_unused_result)); - - /** Accessor for an AIO event - @param[in] index Index into the array - @return the event at the index */ - io_event* io_events(ulint index) - MY_ATTRIBUTE((warn_unused_result)) + tpool::task_group* get_task_group() { - ut_a(index < m_events.size()); - - return(&m_events[index]); + return &m_group; } - /** Accessor for the AIO context - @param[in] segment Segment for which to get the context - @return the AIO context for the segment */ - io_context* io_ctx(ulint segment) - MY_ATTRIBUTE((warn_unused_result)) + ~io_slots() { - ut_ad(segment < get_n_segments()); - - return(m_aio_ctx[segment]); + wait(); } +}; - /** Creates an io_context for native linux AIO. - @param[in] max_events number of events - @param[out] io_ctx io_ctx to initialize. - @return true on success. */ - static bool linux_create_io_ctx(unsigned max_events, io_context_t* io_ctx) - MY_ATTRIBUTE((warn_unused_result)); +io_slots* read_slots; +io_slots* write_slots; +io_slots* ibuf_slots; - /** Checks if the system supports native linux aio. On some kernel - versions where native aio is supported it won't work on tmpfs. In such - cases we can't use native aio as it is not possible to mix simulated - and native aio. - @return true if supported, false otherwise. */ - static bool is_linux_native_aio_supported() - MY_ATTRIBUTE((warn_unused_result)); -#endif /* LINUX_NATIVE_AIO */ +/** Number of retries for partial I/O's */ +static const ulint NUM_RETRIES_ON_PARTIAL_IO = 10; -#ifdef WIN_ASYNC_IO - HANDLE m_completion_port; - /** Wake up all AIO threads in Windows native aio */ - static void wake_at_shutdown() { - AIO *all_arrays[] = {s_reads, s_writes, s_log, s_ibuf }; - for (size_t i = 0; i < array_elements(all_arrays); i++) { - AIO *a = all_arrays[i]; - if (a) { - PostQueuedCompletionStatus(a->m_completion_port, 0, - IOCP_SHUTDOWN_KEY, 0); - } - } - } -#endif /* WIN_ASYNC_IO */ +/* This specifies the file permissions InnoDB uses when it creates files in +Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to +my_umask */ -#ifdef _WIN32 - /** This function can be called if one wants to post a batch of reads - and prefers an I/O - handler thread to handle them all at once later.You - must call os_aio_simulated_wake_handler_threads later to ensure the - threads are not left sleeping! */ - static void simulated_put_read_threads_to_sleep(); +#ifndef _WIN32 +/** Umask for creating files */ +static ulint os_innodb_umask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; +#else +/** Umask for creating files */ +static ulint os_innodb_umask = 0; #endif /* _WIN32 */ - /** Create an instance using new(std::nothrow) - @param[in] id Latch ID - @param[in] n_slots The number of AIO request slots - @param[in] segments The number of segments - @return a new AIO instance */ - static AIO* create( - latch_id_t id, - ulint n_slots, - ulint segments) - MY_ATTRIBUTE((warn_unused_result)); - - /** Initializes the asynchronous io system. Creates one array each - for ibuf and log I/O. Also creates one array each for read and write - where each array is divided logically into n_readers and n_writers - respectively. The caller must create an i/o handler thread for each - segment in these arrays. This function also creates the sync array. - No I/O handler thread needs to be created for that - @param[in] n_per_seg maximum number of pending aio - operations allowed per segment - @param[in] n_readers number of reader threads - @param[in] n_writers number of writer threads - @param[in] n_slots_sync number of slots in the sync aio array - @return true if AIO sub-system was started successfully */ - static bool start( - ulint n_per_seg, - ulint n_readers, - ulint n_writers, - ulint n_slots_sync) - MY_ATTRIBUTE((warn_unused_result)); - - /** Free the AIO arrays */ - static void shutdown(); - - /** Print all the AIO segments - @param[in,out] file Where to print */ - static void print_all(FILE* file); - - /** Calculates local segment number and aio array from global - segment number. - @param[out] array AIO wait array - @param[in] segment global segment number - @return local segment number within the aio array */ - static ulint get_array_and_local_segment( - AIO** array, - ulint segment) - MY_ATTRIBUTE((warn_unused_result)); - - /** Select the IO slot array - @param[in,out] type Type of IO, READ or WRITE - @param[in] read_only true if running in read-only mode - @param[in] mode IO mode - @return slot array or NULL if invalid mode specified */ - static AIO* select_slot_array( - IORequest& type, - bool read_only, - ulint mode) - MY_ATTRIBUTE((warn_unused_result)); - - /** Calculates segment number for a slot. - @param[in] array AIO wait array - @param[in] slot slot in this array - @return segment number (which is the number used by, for example, - I/O handler threads) */ - static ulint get_segment_no_from_slot( - const AIO* array, - const Slot* slot) - MY_ATTRIBUTE((warn_unused_result)); - - /** Wakes up a simulated AIO I/O-handler thread if it has something - to do. - @param[in] global_segment the number of the segment in the - AIO arrays */ - static void wake_simulated_handler_thread(ulint global_segment); - - /** Check if it is a read request - @param[in] aio The AIO instance to check - @return true if the AIO instance is for reading. */ - static bool is_read(const AIO* aio) - MY_ATTRIBUTE((warn_unused_result)) - { - return(s_reads == aio); - } - - /** Wait on an event until no pending writes */ - static void wait_until_no_pending_writes() - { - os_event_wait(AIO::s_writes->m_is_empty); - } - - /** Print to file - @param[in] file File to write to */ - static void print_to_file(FILE* file); - - /** Check for pending IO. Gets the count and also validates the - data structures. - @return count of pending IO requests */ - static ulint total_pending_io_count(); - -private: - /** Initialise the slots - @return DB_SUCCESS or error code */ - dberr_t init_slots() - MY_ATTRIBUTE((warn_unused_result)); - - /** Wakes up a simulated AIO I/O-handler thread if it has something - to do for a local segment in the AIO array. - @param[in] global_segment the number of the segment in the - AIO arrays - @param[in] segment the local segment in the AIO array */ - void wake_simulated_handler_thread(ulint global_segment, ulint segment); - - /** Prints pending IO requests per segment of an aio array. - We probably don't need per segment statistics but they can help us - during development phase to see if the IO requests are being - distributed as expected. - @param[in,out] file file where to print - @param[in] segments pending IO array */ - void print_segment_info( - FILE* file, - const ulint* segments); - -#ifdef LINUX_NATIVE_AIO - /** Initialise the Linux native AIO data structures - @return DB_SUCCESS or error code */ - dberr_t init_linux_native_aio() - MY_ATTRIBUTE((warn_unused_result)); -#endif /* LINUX_NATIVE_AIO */ - -private: - typedef std::vector<Slot> Slots; - - /** the mutex protecting the aio array */ - mutable SysMutex m_mutex; - - /** Pointer to the slots in the array. - Number of elements must be divisible by n_threads. */ - Slots m_slots; - - /** Number of segments in the aio array of pending aio requests. - A thread can wait separately for any one of the segments. */ - ulint m_n_segments; - - /** The event which is set to the signaled state when - there is space in the aio outside the ibuf segment; - os_event_set() and os_event_reset() are protected by AIO::m_mutex */ - os_event_t m_not_full; - - /** The event which is set to the signaled state when - there are no pending i/os in this array; - os_event_set() and os_event_reset() are protected by AIO::m_mutex */ - os_event_t m_is_empty; - - /** Number of reserved slots in the AIO array outside - the ibuf segment */ - ulint m_n_reserved; - - -#if defined(LINUX_NATIVE_AIO) - typedef std::vector<io_event> IOEvents; - - /** completion queue for IO. There is one such queue per - segment. Each thread will work on one ctx exclusively. */ - io_context_t* m_aio_ctx; - - /** The array to collect completed IOs. There is one such - event for each possible pending IO. The size of the array - is equal to m_slots.size(). */ - IOEvents m_events; -#endif /* LINUX_NATIV_AIO */ - - /** The aio arrays for non-ibuf i/o and ibuf i/o, as well as - sync AIO. These are NULL when the module has not yet been - initialized. */ - - /** Insert buffer */ - static AIO* s_ibuf; - /** Redo log */ - static AIO* s_log; - - /** Reads */ - static AIO* s_reads; - - /** Writes */ - static AIO* s_writes; - - /** Synchronous I/O */ - static AIO* s_sync; -}; - -/** Static declarations */ -AIO* AIO::s_reads; -AIO* AIO::s_writes; -AIO* AIO::s_ibuf; -AIO* AIO::s_log; -AIO* AIO::s_sync; - -#if defined(LINUX_NATIVE_AIO) -/** timeout for each io_getevents() call = 500ms. */ -static const ulint OS_AIO_REAP_TIMEOUT = 500000000UL; - -/** time to sleep, in microseconds if io_setup() returns EAGAIN. */ -static const ulint OS_AIO_IO_SETUP_RETRY_SLEEP = 500000UL; - -/** number of attempts before giving up on io_setup(). */ -static const int OS_AIO_IO_SETUP_RETRY_ATTEMPTS = 5; -#endif /* LINUX_NATIVE_AIO */ - -/** Array of events used in simulated AIO */ -static os_event_t* os_aio_segment_wait_events; +/** Flag indicating if the page_cleaner is in active state. */ +extern bool buf_page_cleaner_is_active; -/** Number of asynchronous I/O segments. Set by os_aio_init(). */ -static ulint os_aio_n_segments = ULINT_UNDEFINED; +#ifdef WITH_INNODB_DISALLOW_WRITES +#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event) +#else +#define WAIT_ALLOW_WRITES() do { } while (0) +#endif /* WITH_INNODB_DISALLOW_WRITES */ -/** If the following is true, read i/o handler threads try to -wait until a batch of new read requests have been posted */ -static bool os_aio_recommend_sleep_for_read_threads; ulint os_n_file_reads; static ulint os_bytes_read_since_printout; @@ -688,11 +168,12 @@ bool os_has_said_disk_full; /** Default Zip compression level */ extern uint page_zip_level; -/** Validates the consistency of the aio system. -@return true if ok */ -static -bool -os_aio_validate(); +#ifdef UNIV_PFS_IO +/* Keys to register InnoDB I/O with performance schema */ +mysql_pfs_key_t innodb_data_file_key; +mysql_pfs_key_t innodb_log_file_key; +mysql_pfs_key_t innodb_temp_file_key; +#endif /** Handle errors for file operations. @param[in] name name of a file or NULL @@ -756,30 +237,8 @@ static void os_file_handle_rename_error(const char* name, const char* new_name) } } -/** Does simulated AIO. This function should be called by an i/o-handler -thread. - -@param[in] segment The number of the segment in the aio arrays to wait - for; segment 0 is the ibuf i/o thread, segment 1 the - log i/o thread, then follow the non-ibuf read threads, - and as the last are the non-ibuf write threads -@param[out] m1 the messages passed with the AIO request; note that - also in the case where the AIO operation failed, these - output parameters are valid and can be used to restart - the operation, for example -@param[out] m2 Callback argument -@param[in] type IO context -@return DB_SUCCESS or error code */ -static -dberr_t -os_aio_simulated_handler( - ulint global_segment, - fil_node_t** m1, - void** m2, - IORequest* type); #ifdef _WIN32 -static HANDLE win_get_syncio_event(); /** Wrapper around Windows DeviceIoControl() function. @@ -803,7 +262,7 @@ os_win32_device_io_control( ) { OVERLAPPED overlapped = { 0 }; - overlapped.hEvent = win_get_syncio_event(); + overlapped.hEvent = tpool::win_get_syncio_event(); BOOL result = DeviceIoControl(handle, code, inbuf, inbuf_size, outbuf, outbuf_size, NULL, &overlapped); @@ -817,47 +276,7 @@ os_win32_device_io_control( #endif -#ifdef WIN_ASYNC_IO -/** This function is only used in Windows asynchronous i/o. -Waits for an aio operation to complete. This function is used to wait the -for completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment The number of the segment in the aio arrays to -wait for; segment 0 is the ibuf I/O thread, -segment 1 the log I/O thread, then follow the -non-ibuf read threads, and as the last are the -non-ibuf write threads; if this is -ULINT_UNDEFINED, then it means that sync AIO -is used, and this parameter is ignored -@param[in] pos this parameter is used only in sync AIO: -wait for the aio slot at this position -@param[out] m1 the messages passed with the AIO request; note -that also in the case where the AIO operation -failed, these output parameters are valid and -can be used to restart the operation, -for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ -static -dberr_t -os_aio_windows_handler( - ulint segment, - ulint pos, - fil_node_t** m1, - void** m2, - IORequest* type); -#endif /* WIN_ASYNC_IO */ -/** Generic AIO Handler methods. Currently handles IO post processing. */ -class AIOHandler { -public: - /** Do any post processing after a read/write - @return DB_SUCCESS or error code. */ - static dberr_t post_io_processing(Slot* slot); -}; /** Helper class for doing synchronous file IO. Currently, the objective is to hide the OS specific code, so that the higher level functions aren't @@ -890,10 +309,7 @@ public: @return the number of bytes read/written or negative value on error */ ssize_t execute(const IORequest& request); - /** Do the read/write - @param[in,out] slot The IO slot, it has the IO context - @return the number of bytes read/written or negative value on error */ - static ssize_t execute(Slot* slot); + /** Move the read/write offset up to where the partial IO succeeded. @param[in] n_bytes The number of bytes to advance */ @@ -922,67 +338,6 @@ private: os_offset_t m_offset; }; -/** Do any post processing after a read/write -@return DB_SUCCESS or error code. */ -dberr_t -AIOHandler::post_io_processing(Slot* slot) -{ - ut_ad(slot->is_reserved); - - /* Total bytes read so far */ - ulint n_bytes = ulint(slot->ptr - slot->buf) + slot->n_bytes; - - return(n_bytes == slot->original_len ? DB_SUCCESS : DB_FAIL); -} - -/** Count the number of free slots -@return number of reserved slots */ -ulint -AIO::pending_io_count() const -{ - acquire(); - -#ifdef UNIV_DEBUG - ut_a(m_n_segments > 0); - ut_a(!m_slots.empty()); - - ulint count = 0; - - for (ulint i = 0; i < m_slots.size(); ++i) { - - const Slot& slot = m_slots[i]; - - if (slot.is_reserved) { - ++count; - ut_a(slot.len > 0); - } - } - - ut_a(m_n_reserved == count); -#endif /* UNIV_DEBUG */ - - ulint reserved = m_n_reserved; - - release(); - - return(reserved); -} - -#ifdef UNIV_DEBUG -/** Validates the consistency the aio system some of the time. -@return true if ok or the check was skipped */ -static -bool -os_aio_validate_skip() -{ -/** Try os_aio_validate() every this many times */ -# define OS_AIO_VALIDATE_SKIP 13 - - static Atomic_counter<uint32_t> os_aio_validate_count; - return (os_aio_validate_count++ % OS_AIO_VALIDATE_SKIP) || os_aio_validate(); -} -#endif /* UNIV_DEBUG */ - #undef USE_FILE_LOCK #ifndef _WIN32 /* On Windows, mandatory locking is used */ @@ -1026,101 +381,6 @@ os_file_lock( } #endif /* USE_FILE_LOCK */ -/** Calculates local segment number and aio array from global segment number. -@param[out] array aio wait array -@param[in] segment global segment number -@return local segment number within the aio array */ -ulint -AIO::get_array_and_local_segment( - AIO** array, - ulint segment) -{ - ulint local_segment; - ulint n_extra_segs = (srv_read_only_mode) ? 0 : 2; - - ut_a(segment < os_aio_n_segments); - - if (!srv_read_only_mode && segment < n_extra_segs) { - - /* We don't support ibuf/log IO during read only mode. */ - - if (segment == IO_IBUF_SEGMENT) { - - *array = s_ibuf; - - } else if (segment == IO_LOG_SEGMENT) { - - *array = s_log; - - } else { - *array = NULL; - } - - local_segment = 0; - - } else if (segment < s_reads->m_n_segments + n_extra_segs) { - - *array = s_reads; - local_segment = segment - n_extra_segs; - - } else { - *array = s_writes; - - local_segment = segment - - (s_reads->m_n_segments + n_extra_segs); - } - - return(local_segment); -} - -/** Frees a slot in the aio array. Assumes caller owns the mutex. -@param[in,out] slot Slot to release */ -void -AIO::release(Slot* slot) -{ - ut_ad(is_mutex_owned()); - - ut_ad(slot->is_reserved); - - slot->is_reserved = false; - - --m_n_reserved; - - if (m_n_reserved == m_slots.size() - 1) { - os_event_set(m_not_full); - } - - if (m_n_reserved == 0) { - os_event_set(m_is_empty); - } - -#if defined(LINUX_NATIVE_AIO) - - if (srv_use_native_aio) { - memset(&slot->control, 0x0, sizeof(slot->control)); - slot->ret = 0; - slot->n_bytes = 0; - } else { - /* These fields should not be used if we are not - using native AIO. */ - ut_ad(slot->n_bytes == 0); - ut_ad(slot->ret == 0); - } - -#endif /* WIN_ASYNC_IO */ -} - -/** Frees a slot in the AIO array. Assumes caller doesn't own the mutex. -@param[in,out] slot Slot to release */ -void -AIO::release_with_mutex(Slot* slot) -{ - acquire(); - - release(slot); - - release(); -} /** Create a temporary file. This function is like tmpfile(3), but the temporary file is created in the in the mysql server configuration @@ -1464,7 +724,7 @@ os_file_create_subdirs_if_needed( return(success ? DB_SUCCESS : DB_ERROR); } -#ifndef _WIN32 + /** Do the read/write @param[in] request The IO context and type @@ -1475,14 +735,24 @@ SyncFileIO::execute(const IORequest& request) ssize_t n_bytes; if (request.is_read()) { +#ifdef _WIN32 + n_bytes = tpool::pread(m_fh, m_buf, m_n, m_offset); +#else n_bytes = pread(m_fh, m_buf, m_n, m_offset); +#endif } else { ut_ad(request.is_write()); +#ifdef _WIN32 + n_bytes = tpool::pwrite(m_fh, m_buf, m_n, m_offset); +#else n_bytes = pwrite(m_fh, m_buf, m_n, m_offset); +#endif } return(n_bytes); } + +#ifndef _WIN32 /** Free storage space associated with a section of the file. @param[in] fh Open file handle @param[in] off Starting offset (SEEK_SET) @@ -1526,732 +796,7 @@ os_file_punch_hole_posix( return(DB_IO_NO_PUNCH_HOLE); } -#if defined(LINUX_NATIVE_AIO) - -/** Linux native AIO handler */ -class LinuxAIOHandler { -public: - /** - @param[in] global_segment The global segment*/ - LinuxAIOHandler(ulint global_segment) - : - m_global_segment(global_segment) - { - /* Should never be doing Sync IO here. */ - ut_a(m_global_segment != ULINT_UNDEFINED); - - /* Find the array and the local segment. */ - - m_segment = AIO::get_array_and_local_segment( - &m_array, m_global_segment); - m_n_slots = m_array->slots_per_segment(); - } - - /** Destructor */ - ~LinuxAIOHandler() - { - // No op - } - - /** - Process a Linux AIO request - @param[out] m1 the messages passed with the - @param[out] m2 AIO request; note that in case the - AIO operation failed, these output - parameters are valid and can be used to - restart the operation. - @param[out] request IO context - @return DB_SUCCESS or error code */ - dberr_t poll(fil_node_t** m1, void** m2, IORequest* request); - -private: - /** Resubmit an IO request that was only partially successful - @param[in,out] slot Request to resubmit - @return DB_SUCCESS or DB_FAIL if the IO resubmit request failed */ - dberr_t resubmit(Slot* slot); - - /** Check if the AIO succeeded - @param[in,out] slot The slot to check - @return DB_SUCCESS, DB_FAIL if the operation should be retried or - DB_IO_ERROR on all other errors */ - dberr_t check_state(Slot* slot); - - /** @return true if a shutdown was detected */ - bool is_shutdown() const - { - return(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - && !buf_page_cleaner_is_active); - } - - /** If no slot was found then the m_array->m_mutex will be released. - @param[out] n_pending The number of pending IOs - @return NULL or a slot that has completed IO */ - Slot* find_completed_slot(ulint* n_pending); - - /** This is called from within the IO-thread. If there are no completed - IO requests in the slot array, the thread calls this function to - collect more requests from the Linux kernel. - The IO-thread waits on io_getevents(), which is a blocking call, with - a timeout value. Unless the system is very heavy loaded, keeping the - IO-thread very busy, the io-thread will spend most of its time waiting - in this function. - The IO-thread also exits in this function. It checks server status at - each wakeup and that is why we use timed wait in io_getevents(). */ - void collect(); - -private: - /** Slot array */ - AIO* m_array; - - /** Number of slots inthe local segment */ - ulint m_n_slots; - - /** The local segment to check */ - ulint m_segment; - - /** The global segment */ - ulint m_global_segment; -}; - -/** Resubmit an IO request that was only partially successful -@param[in,out] slot Request to resubmit -@return DB_SUCCESS or DB_FAIL if the IO resubmit request failed */ -dberr_t -LinuxAIOHandler::resubmit(Slot* slot) -{ -#ifdef UNIV_DEBUG - /* Bytes already read/written out */ - ulint n_bytes = slot->ptr - slot->buf; - - ut_ad(m_array->is_mutex_owned()); - - ut_ad(n_bytes < slot->original_len); - ut_ad(static_cast<ulint>(slot->n_bytes) < slot->original_len - n_bytes); - /* Partial read or write scenario */ - ut_ad(slot->len >= static_cast<ulint>(slot->n_bytes)); -#endif /* UNIV_DEBUG */ - - slot->len -= slot->n_bytes; - slot->ptr += slot->n_bytes; - slot->offset += slot->n_bytes; - - /* Resetting the bytes read/written */ - slot->n_bytes = 0; - slot->io_already_done = false; - - compile_time_assert(sizeof(off_t) >= sizeof(os_offset_t)); - - struct iocb* iocb = &slot->control; - - if (slot->type.is_read()) { - - io_prep_pread( - iocb, - slot->file, - slot->ptr, - slot->len, - slot->offset); - } else { - - ut_a(slot->type.is_write()); - - io_prep_pwrite( - iocb, - slot->file, - slot->ptr, - slot->len, - slot->offset); - } - - iocb->data = slot; - - /* Resubmit an I/O request */ - int ret = io_submit(m_array->io_ctx(m_segment), 1, &iocb); - srv_stats.buffered_aio_submitted.inc(); - - if (ret < -1) { - errno = -ret; - } - - return(ret < 0 ? DB_IO_PARTIAL_FAILED : DB_SUCCESS); -} - -/** Check if the AIO succeeded -@param[in,out] slot The slot to check -@return DB_SUCCESS, DB_FAIL if the operation should be retried or - DB_IO_ERROR on all other errors */ -dberr_t -LinuxAIOHandler::check_state(Slot* slot) -{ - ut_ad(m_array->is_mutex_owned()); - - /* Note that it may be that there is more then one completed - IO requests. We process them one at a time. We may have a case - here to improve the performance slightly by dealing with all - requests in one sweep. */ - - srv_set_io_thread_op_info( - m_global_segment, "processing completed aio requests"); - - ut_ad(slot->io_already_done); - - dberr_t err = DB_SUCCESS; - - if (slot->ret == 0) { - - err = AIOHandler::post_io_processing(slot); - - } else { - errno = -slot->ret; - - /* os_file_handle_error does tell us if we should retry - this IO. As it stands now, we don't do this retry when - reaping requests from a different context than - the dispatcher. This non-retry logic is the same for - Windows and Linux native AIO. - We should probably look into this to transparently - re-submit the IO. */ - os_file_handle_error(slot->name, "Linux aio"); - - err = DB_IO_ERROR; - } - - return(err); -} - -/** If no slot was found then the m_array->m_mutex will be released. -@param[out] n_pending The number of pending IOs -@return NULL or a slot that has completed IO */ -Slot* -LinuxAIOHandler::find_completed_slot(ulint* n_pending) -{ - ulint offset = m_n_slots * m_segment; - - *n_pending = 0; - - m_array->acquire(); - - Slot* slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved) { - - ++*n_pending; - - if (slot->io_already_done) { - - /* Something for us to work on. - Note: We don't release the mutex. */ - return(slot); - } - } - } - - m_array->release(); - - return(NULL); -} - -/** This function is only used in Linux native asynchronous i/o. This is -called from within the io-thread. If there are no completed IO requests -in the slot array, the thread calls this function to collect more -requests from the kernel. -The io-thread waits on io_getevents(), which is a blocking call, with -a timeout value. Unless the system is very heavy loaded, keeping the -io-thread very busy, the io-thread will spend most of its time waiting -in this function. -The io-thread also exits in this function. It checks server status at -each wakeup and that is why we use timed wait in io_getevents(). */ -void -LinuxAIOHandler::collect() -{ - ut_ad(m_n_slots > 0); - ut_ad(m_array != NULL); - ut_ad(m_segment < m_array->get_n_segments()); - - /* Which io_context we are going to use. */ - io_context* io_ctx = m_array->io_ctx(m_segment); - - /* Starting point of the m_segment we will be working on. */ - ulint start_pos = m_segment * m_n_slots; - - /* End point. */ - ulint end_pos = start_pos + m_n_slots; - - for (;;) { - struct io_event* events; - - /* Which part of event array we are going to work on. */ - events = m_array->io_events(m_segment * m_n_slots); - - /* Initialize the events. */ - memset(events, 0, sizeof(*events) * m_n_slots); - - /* The timeout value is arbitrary. We probably need - to experiment with it a little. */ - struct timespec timeout; - - timeout.tv_sec = 0; - timeout.tv_nsec = OS_AIO_REAP_TIMEOUT; - - int ret; - - ret = io_getevents(io_ctx, 1, m_n_slots, events, &timeout); - - for (int i = 0; i < ret; ++i) { - - struct iocb* iocb; - - iocb = reinterpret_cast<struct iocb*>(events[i].obj); - ut_a(iocb != NULL); - - Slot* slot = reinterpret_cast<Slot*>(iocb->data); - - /* Some sanity checks. */ - ut_a(slot != NULL); - ut_a(slot->is_reserved); - - /* We are not scribbling previous segment. */ - ut_a(slot->pos >= start_pos); - - /* We have not overstepped to next segment. */ - ut_a(slot->pos < end_pos); - - /* Deallocate unused blocks from file system. - This is newer done to page 0 or to log files.*/ - if (slot->offset > 0 - && !slot->type.is_log() - && slot->type.is_write() - && slot->type.punch_hole()) { - - slot->err = slot->type.punch_hole( - slot->file, - slot->offset, slot->len); - } else { - slot->err = DB_SUCCESS; - } - - /* Mark this request as completed. The error handling - will be done in the calling function. */ - m_array->acquire(); - - /* events[i].res2 should always be ZERO */ - ut_ad(events[i].res2 == 0); - slot->io_already_done = true; - - /*Even though events[i].res is an unsigned number - in libaio, it is used to return a negative value - (negated errno value) to indicate error and a positive - value to indicate number of bytes read or written. */ - - if (events[i].res > slot->len) { - /* failure */ - slot->n_bytes = 0; - slot->ret = events[i].res; - } else { - /* success */ - slot->n_bytes = events[i].res; - slot->ret = 0; - } - m_array->release(); - } - - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - || !buf_page_cleaner_is_active - || ret > 0) { - - break; - } - - /* This error handling is for any error in collecting the - IO requests. The errors, if any, for any particular IO - request are simply passed on to the calling routine. */ - - switch (ret) { - case -EAGAIN: - /* Not enough resources! Try again. */ - - case -EINTR: - /* Interrupted! The behaviour in case of an interrupt. - If we have some completed IOs available then the - return code will be the number of IOs. We get EINTR - only if there are no completed IOs and we have been - interrupted. */ - - case 0: - /* No pending request! Go back and check again. */ - - continue; - } - - /* All other errors should cause a trap for now. */ - ib::fatal() - << "Unexpected ret_code[" << ret - << "] from io_getevents()!"; - - break; - } -} - -/** Process a Linux AIO request -@param[out] m1 the messages passed with the -@param[out] m2 AIO request; note that in case the - AIO operation failed, these output - parameters are valid and can be used to - restart the operation. -@param[out] request IO context -@return DB_SUCCESS or error code */ -dberr_t -LinuxAIOHandler::poll(fil_node_t** m1, void** m2, IORequest* request) -{ - dberr_t err = DB_SUCCESS; - Slot* slot; - - /* Loop until we have found a completed request. */ - for (;;) { - - ulint n_pending; - - slot = find_completed_slot(&n_pending); - - if (slot != NULL) { - - ut_ad(m_array->is_mutex_owned()); - - err = check_state(slot); - - /* DB_FAIL is not a hard error, we should retry */ - if (err != DB_FAIL) { - break; - } - - /* Partial IO, resubmit request for - remaining bytes to read/write */ - err = resubmit(slot); - - if (err != DB_SUCCESS) { - break; - } - - m_array->release(); - - } else if (is_shutdown() && n_pending == 0) { - - /* There is no completed request. If there is - no pending request at all, and the system is - being shut down, exit. */ - - *m1 = NULL; - *m2 = NULL; - - return(DB_SUCCESS); - - } else { - - /* Wait for some request. Note that we return - from wait if we have found a request. */ - - srv_set_io_thread_op_info( - m_global_segment, - "waiting for completed aio requests"); - - collect(); - } - } - - if (err == DB_IO_PARTIAL_FAILED) { - /* Aborting in case of submit failure */ - ib::fatal() - << "Native Linux AIO interface. " - "io_submit() call failed when " - "resubmitting a partial I/O " - "request on the file " << slot->name - << "."; - } - - *m1 = slot->m1; - *m2 = slot->m2; - - *request = slot->type; - - m_array->release(slot); - - m_array->release(); - - return(err); -} - -/** This function is only used in Linux native asynchronous i/o. -Waits for an aio operation to complete. This function is used to wait for -the completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! - -@param[in] global_seg segment number in the aio array - to wait for; segment 0 is the ibuf - i/o thread, segment 1 is log i/o thread, - then follow the non-ibuf read threads, - and the last are the non-ibuf write - threads. -@param[out] m1 the messages passed with the -@param[out] m2 AIO request; note that in case the - AIO operation failed, these output - parameters are valid and can be used to - restart the operation. -@param[out]xi request IO context -@return DB_SUCCESS if the IO was successful */ -static -dberr_t -os_aio_linux_handler( - ulint global_segment, - fil_node_t** m1, - void** m2, - IORequest* request) -{ - return LinuxAIOHandler(global_segment).poll(m1, m2, request); -} - -/** Dispatch an AIO request to the kernel. -@param[in,out] slot an already reserved slot -@return true on success. */ -bool -AIO::linux_dispatch(Slot* slot) -{ - ut_a(slot->is_reserved); - ut_ad(slot->type.validate()); - - /* Find out what we are going to work with. - The iocb struct is directly in the slot. - The io_context is one per segment. */ - - ulint io_ctx_index; - struct iocb* iocb = &slot->control; - - io_ctx_index = (slot->pos * m_n_segments) / m_slots.size(); - - int ret = io_submit(m_aio_ctx[io_ctx_index], 1, &iocb); - srv_stats.buffered_aio_submitted.inc(); - - /* io_submit() returns number of successfully queued requests - or -errno. */ - - if (ret != 1) { - errno = -ret; - } - - return(ret == 1); -} - -/** Creates an io_context for native linux AIO. -@param[in] max_events number of events -@param[out] io_ctx io_ctx to initialize. -@return true on success. */ -bool -AIO::linux_create_io_ctx( - unsigned max_events, - io_context_t* io_ctx) -{ - ssize_t n_retries = 0; - - for (;;) { - - memset(io_ctx, 0x0, sizeof(*io_ctx)); - - /* Initialize the io_ctx. Tell it how many pending - IO requests this context will handle. */ - - int ret = io_setup(max_events, io_ctx); - - if (ret == 0) { - /* Success. Return now. */ - return(true); - } - - /* If we hit EAGAIN we'll make a few attempts before failing. */ - - switch (ret) { - case -EAGAIN: - if (n_retries == 0) { - /* First time around. */ - ib::warn() - << "io_setup() failed with EAGAIN." - " Will make " - << OS_AIO_IO_SETUP_RETRY_ATTEMPTS - << " attempts before giving up."; - } - - if (n_retries < OS_AIO_IO_SETUP_RETRY_ATTEMPTS) { - - ++n_retries; - - ib::warn() - << "io_setup() attempt " - << n_retries << "."; - - os_thread_sleep(OS_AIO_IO_SETUP_RETRY_SLEEP); - - continue; - } - - /* Have tried enough. Better call it a day. */ - ib::error() - << "io_setup() failed with EAGAIN after " - << OS_AIO_IO_SETUP_RETRY_ATTEMPTS - << " attempts."; - break; - - case -ENOSYS: - ib::error() - << "Linux Native AIO interface" - " is not supported on this platform. Please" - " check your OS documentation and install" - " appropriate binary of InnoDB."; - - break; - - default: - ib::error() - << "Linux Native AIO setup" - << " returned following error[" - << ret << "]"; - break; - } - - ib::info() - << "You can disable Linux Native AIO by" - " setting innodb_use_native_aio = 0 in my.cnf"; - - break; - } - - return(false); -} - -/** Checks if the system supports native linux aio. On some kernel -versions where native aio is supported it won't work on tmpfs. In such -cases we can't use native aio as it is not possible to mix simulated -and native aio. -@return: true if supported, false otherwise. */ -bool -AIO::is_linux_native_aio_supported() -{ - File fd; - io_context_t io_ctx; - char name[1000]; - - if (!linux_create_io_ctx(1, &io_ctx)) { - - /* The platform does not support native aio. */ - - return(false); - - } else if (!srv_read_only_mode) { - - /* Now check if tmpdir supports native aio ops. */ - fd = mysql_tmpfile("ib"); - - if (fd < 0) { - ib::warn() - << "Unable to create temp file to check" - " native AIO support."; - - return(false); - } - } else { - - os_normalize_path(srv_log_group_home_dir); - - ulint dirnamelen = strlen(srv_log_group_home_dir); - - ut_a(dirnamelen < (sizeof name) - 10 - sizeof "ib_logfile"); - - memcpy(name, srv_log_group_home_dir, dirnamelen); - - /* Add a path separator if needed. */ - if (dirnamelen && name[dirnamelen - 1] != OS_PATH_SEPARATOR) { - - name[dirnamelen++] = OS_PATH_SEPARATOR; - } - - strcpy(name + dirnamelen, "ib_logfile0"); - - fd = my_open(name, O_RDONLY | O_CLOEXEC, MYF(0)); - - if (fd == -1) { - - ib::warn() - << "Unable to open" - << " \"" << name << "\" to check native" - << " AIO read support."; - - return(false); - } - } - - struct io_event io_event; - - memset(&io_event, 0x0, sizeof(io_event)); - - byte* buf = static_cast<byte*>(ut_malloc_nokey(srv_page_size * 2)); - byte* ptr = static_cast<byte*>(ut_align(buf, srv_page_size)); - - struct iocb iocb; - - /* Suppress valgrind warning. */ - memset(buf, 0x00, srv_page_size * 2); - memset(&iocb, 0x0, sizeof(iocb)); - - struct iocb* p_iocb = &iocb; - - if (!srv_read_only_mode) { - - io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0); - - } else { - ut_a(srv_page_size >= 512); - io_prep_pread(p_iocb, fd, ptr, 512, 0); - } - - int err = io_submit(io_ctx, 1, &p_iocb); - srv_stats.buffered_aio_submitted.inc(); - - if (err >= 1) { - /* Now collect the submitted IO request. */ - err = io_getevents(io_ctx, 1, 1, &io_event, NULL); - } - - ut_free(buf); - my_close(fd, MYF(MY_WME)); - - switch (err) { - case 1: - return(true); - - case -EINVAL: - case -ENOSYS: - ib::error() - << "Linux Native AIO not supported. You can either" - " move " - << (srv_read_only_mode ? name : "tmpdir") - << " to a file system that supports native" - " AIO or you can set innodb_use_native_aio to" - " FALSE to avoid this message."; - - /* fall through. */ - default: - ib::error() - << "Linux Native AIO check on " - << (srv_read_only_mode ? name : "tmpdir") - << "returned error[" << -err << "]"; - } - - return(false); -} - -#endif /* LINUX_NATIVE_AIO */ /** Retrieves the last error number if an error occurs in a file io function. The number should be retrieved before any other OS calls (because they may @@ -3283,127 +1828,6 @@ os_file_set_eof( #include <WinIoCtl.h> -/* -Windows : Handling synchronous IO on files opened asynchronously. - -If file is opened for asynchronous IO (FILE_FLAG_OVERLAPPED) and also bound to -a completion port, then every IO on this file would normally be enqueued to the -completion port. Sometimes however we would like to do a synchronous IO. This is -possible if we initialitze have overlapped.hEvent with a valid event and set its -lowest order bit to 1 (see MSDN ReadFile and WriteFile description for more info) - -We'll create this special event once for each thread and store in thread local -storage. -*/ - - -static void __stdcall win_free_syncio_event(void *data) { - if (data) { - CloseHandle((HANDLE)data); - } -} - - -/* -Retrieve per-thread event for doing synchronous io on asyncronously opened files -*/ -static HANDLE win_get_syncio_event() -{ - HANDLE h; - - h = (HANDLE)FlsGetValue(fls_sync_io); - if (h) { - return h; - } - h = CreateEventA(NULL, FALSE, FALSE, NULL); - ut_a(h); - /* Set low-order bit to keeps I/O completion from being queued */ - h = (HANDLE)((uintptr_t)h | 1); - FlsSetValue(fls_sync_io, h); - return h; -} - - -/** Do the read/write -@param[in] request The IO context and type -@return the number of bytes read/written or negative value on error */ -ssize_t -SyncFileIO::execute(const IORequest& request) -{ - OVERLAPPED seek; - - memset(&seek, 0x0, sizeof(seek)); - - seek.hEvent = win_get_syncio_event(); - seek.Offset = (DWORD) m_offset & 0xFFFFFFFF; - seek.OffsetHigh = (DWORD) (m_offset >> 32); - - BOOL ret; - DWORD n_bytes; - - if (request.is_read()) { - ret = ReadFile(m_fh, m_buf, - static_cast<DWORD>(m_n), NULL, &seek); - - } else { - ut_ad(request.is_write()); - ret = WriteFile(m_fh, m_buf, - static_cast<DWORD>(m_n), NULL, &seek); - } - if (ret || (GetLastError() == ERROR_IO_PENDING)) { - /* Wait for async io to complete */ - ret = GetOverlappedResult(m_fh, &seek, &n_bytes, TRUE); - } - - return(ret ? static_cast<ssize_t>(n_bytes) : -1); -} - -/** Do the read/write -@param[in,out] slot The IO slot, it has the IO context -@return the number of bytes read/written or negative value on error */ -ssize_t -SyncFileIO::execute(Slot* slot) -{ - BOOL ret; - slot->control.hEvent = win_get_syncio_event(); - if (slot->type.is_read()) { - - ret = ReadFile( - slot->file, slot->ptr, slot->len, - NULL, &slot->control); - - } else { - ut_ad(slot->type.is_write()); - - ret = WriteFile( - slot->file, slot->ptr, slot->len, - NULL, &slot->control); - - } - if (ret || (GetLastError() == ERROR_IO_PENDING)) { - /* Wait for async io to complete */ - ret = GetOverlappedResult(slot->file, &slot->control, &slot->n_bytes, TRUE); - } - - return(ret ? static_cast<ssize_t>(slot->n_bytes) : -1); -} - -/* Startup/shutdown */ - -struct WinIoInit -{ - WinIoInit() { - fls_sync_io = FlsAlloc(win_free_syncio_event); - ut_a(fls_sync_io != FLS_OUT_OF_INDEXES); - } - - ~WinIoInit() { - FlsFree(fls_sync_io); - } -}; - -/* Ensures proper initialization and shutdown */ -static WinIoInit win_io_init; /** Free storage space associated with a section of the file. @@ -4167,18 +2591,9 @@ os_file_create_func( } } - if (*success && srv_use_native_aio && (attributes & FILE_FLAG_OVERLAPPED)) { - /* Bind the file handle to completion port. Completion port - might not be created yet, in some stages of backup, but - must always be there for the server.*/ - HANDLE port = (type == OS_LOG_FILE) ? - log_completion_port : data_completion_port; - ut_a(port || srv_operation != SRV_OPERATION_NORMAL); - if (port) { - ut_a(CreateIoCompletionPort(file, port, 0, 0)); - } + if (*success && (attributes & FILE_FLAG_OVERLAPPED) && srv_thread_pool) { + srv_thread_pool->bind(file); } - return(file); } @@ -4438,15 +2853,18 @@ bool os_file_close_func( os_file_t file) { - ut_a(file); + ut_a(file != 0); - if (CloseHandle(file)) { - return(true); - } - os_file_handle_error(NULL, "close"); + if (!CloseHandle(file)) { + os_file_handle_error(NULL, "close"); + return false; + } - return(false); + if(srv_thread_pool) + srv_thread_pool->unbind(file); + + return(true); } /** Gets a file size. @@ -4943,17 +3361,19 @@ os_file_read_page( if (ulint(n_bytes) == n || (err != DB_SUCCESS && !exit_on_err)) { return err; } - - ib::error() << "Tried to read " << n << " bytes at offset " - << offset << ", but was only able to read " << n_bytes; + int os_err = IF_WIN((int)GetLastError(), errno); if (!os_file_handle_error_cond_exit( NULL, "read", exit_on_err, false)) { ib::fatal() - << "Cannot read from file. OS error number " - << errno << "."; + << "Tried to read " << n << " bytes at offset " + << offset << ", but was only able to read " << n_bytes + << ".Cannot read from file. OS error number " + << os_err << "."; + } else { + ib::error() << "Tried to read " << n << " bytes at offset " + << offset << ", but was only able to read " << n_bytes; } - if (err == DB_SUCCESS) { err = DB_IO_ERROR; } @@ -5478,979 +3898,201 @@ os_file_get_status( return(ret); } -/** -Waits for an AIO operation to complete. This function is used to wait the -for completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment The number of the segment in the aio arrays to - wait for; segment 0 is the ibuf I/O thread, - segment 1 the log I/O thread, then follow the - non-ibuf read threads, and as the last are the - non-ibuf write threads; if this is - ULINT_UNDEFINED, then it means that sync AIO - is used, and this parameter is ignored -@param[out] m1 the messages passed with the AIO request; note - that also in the case where the AIO operation - failed, these output parameters are valid and - can be used to restart the operation, - for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ -dberr_t -os_aio_handler( - ulint segment, - fil_node_t** m1, - void** m2, - IORequest* request) -{ - dberr_t err; - - if (srv_use_native_aio) { - srv_set_io_thread_op_info(segment, "native aio handle"); - -#ifdef WIN_ASYNC_IO - - err = os_aio_windows_handler(segment, 0, m1, m2, request); - -#elif defined(LINUX_NATIVE_AIO) - - err = os_aio_linux_handler(segment, m1, m2, request); - -#else - ut_error; - - err = DB_ERROR; /* Eliminate compiler warning */ - -#endif /* WIN_ASYNC_IO */ - - } else { - srv_set_io_thread_op_info(segment, "simulated aio handle"); - - err = os_aio_simulated_handler(segment, m1, m2, request); - } - - return(err); -} - -#ifdef WIN_ASYNC_IO -static HANDLE new_completion_port() -{ - HANDLE h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); - ut_a(h); - return h; -} -#endif - -/** Constructor -@param[in] id The latch ID -@param[in] n Number of AIO slots -@param[in] segments Number of segments */ -AIO::AIO( - latch_id_t id, - ulint n, - ulint segments) - : -#ifdef WIN_ASYNC_IO - m_completion_port(new_completion_port()), -#endif - m_slots(n), - m_n_segments(segments), - m_n_reserved() -# ifdef LINUX_NATIVE_AIO - ,m_aio_ctx(), - m_events(m_slots.size()) -# endif /* LINUX_NATIVE_AIO */ -{ - ut_a(n > 0); - ut_a(m_n_segments > 0); - - mutex_create(id, &m_mutex); - - m_not_full = os_event_create("aio_not_full"); - m_is_empty = os_event_create("aio_is_empty"); - - memset((void*)&m_slots[0], 0x0, sizeof(m_slots[0]) * m_slots.size()); -#ifdef LINUX_NATIVE_AIO - memset(&m_events[0], 0x0, sizeof(m_events[0]) * m_events.size()); -#endif /* LINUX_NATIVE_AIO */ - - os_event_set(m_is_empty); -} - -/** Initialise the slots */ -dberr_t -AIO::init_slots() -{ - for (ulint i = 0; i < m_slots.size(); ++i) { - Slot& slot = m_slots[i]; - - slot.pos = static_cast<uint16_t>(i); - - slot.is_reserved = false; -#ifdef WIN_ASYNC_IO - - slot.array = this; - -#elif defined(LINUX_NATIVE_AIO) - - slot.ret = 0; - - slot.n_bytes = 0; - - memset(&slot.control, 0x0, sizeof(slot.control)); - -#endif /* WIN_ASYNC_IO */ - } - - return(DB_SUCCESS); -} +extern void fil_aio_callback(const tpool::aiocb *cb); -#ifdef LINUX_NATIVE_AIO -/** Initialise the Linux Native AIO interface */ -dberr_t -AIO::init_linux_native_aio() +static void io_callback(tpool::aiocb* cb) { - /* Initialize the io_context array. One io_context - per segment in the array. */ - - ut_a(m_aio_ctx == NULL); - - m_aio_ctx = static_cast<io_context**>( - ut_zalloc_nokey(m_n_segments * sizeof(*m_aio_ctx))); - - if (m_aio_ctx == NULL) { - return(DB_OUT_OF_MEMORY); - } + fil_aio_callback(cb); - io_context** ctx = m_aio_ctx; - ulint max_events = slots_per_segment(); - - for (ulint i = 0; i < m_n_segments; ++i, ++ctx) { - - if (!linux_create_io_ctx(max_events, ctx)) { - /* If something bad happened during aio setup - we disable linux native aio. - The disadvantage will be a small memory leak - at shutdown but that's ok compared to a crash - or a not working server. - This frequently happens when running the test suite - with many threads on a system with low fs.aio-max-nr! - */ - - ib::warn() - << "Warning: Linux Native AIO disabled " - << "because _linux_create_io_ctx() " - << "failed. To get rid of this warning you can " - << "try increasing system " - << "fs.aio-max-nr to 1048576 or larger or " - << "setting innodb_use_native_aio = 0 in my.cnf"; - ut_free(m_aio_ctx); - m_aio_ctx = 0; - srv_use_native_aio = FALSE; - return(DB_SUCCESS); + /* Return cb back to cache*/ + if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) { + if (read_slots->contains(cb)) { + read_slots->release(cb); + } else { + ut_ad(ibuf_slots->contains(cb)); + ibuf_slots->release(cb); } + } else { + ut_ad(write_slots->contains(cb)); + write_slots->release(cb); } - - return(DB_SUCCESS); } -#endif /* LINUX_NATIVE_AIO */ - -/** Initialise the array */ -dberr_t -AIO::init() -{ - ut_a(!m_slots.empty()); - - if (srv_use_native_aio) { #ifdef LINUX_NATIVE_AIO - dberr_t err = init_linux_native_aio(); - - if (err != DB_SUCCESS) { - return(err); - } - -#endif /* LINUX_NATIVE_AIO */ - } - - return(init_slots()); -} - -/** Creates an aio wait array. Note that we return NULL in case of failure. -We don't care about freeing memory here because we assume that a -failure will result in server refusing to start up. -@param[in] id Latch ID -@param[in] n maximum number of pending AIO operations - allowed; n must be divisible by m_n_segments -@param[in] n_segments number of segments in the AIO array -@return own: AIO array, NULL on failure */ -AIO* -AIO::create( - latch_id_t id, - ulint n, - ulint n_segments) -{ - if ((n % n_segments)) { - - ib::error() - << "Maximum number of AIO operations must be " - << "divisible by number of segments"; - - return(NULL); - } - - AIO* array = UT_NEW_NOKEY(AIO(id, n, n_segments)); - - if (array != NULL && array->init() != DB_SUCCESS) { - - UT_DELETE(array); - - array = NULL; - } - - return(array); -} - -/** AIO destructor */ -AIO::~AIO() -{ - mutex_destroy(&m_mutex); - - os_event_destroy(m_not_full); - os_event_destroy(m_is_empty); - -#if defined(LINUX_NATIVE_AIO) - if (srv_use_native_aio) { - m_events.clear(); - ut_free(m_aio_ctx); - } -#endif /* LINUX_NATIVE_AIO */ -#if defined(WIN_ASYNC_IO) - CloseHandle(m_completion_port); -#endif - - m_slots.clear(); -} +/** Checks if the system supports native linux aio. On some kernel +versions where native aio is supported it won't work on tmpfs. In such +cases we can't use native aio. -/** Initializes the asynchronous io system. Creates one array each for ibuf -and log i/o. Also creates one array each for read and write where each -array is divided logically into n_readers and n_writers -respectively. The caller must create an i/o handler thread for each -segment in these arrays. This function also creates the sync array. -No i/o handler thread needs to be created for that -@param[in] n_per_seg maximum number of pending aio - operations allowed per segment -@param[in] n_readers number of reader threads -@param[in] n_writers number of writer threads -@param[in] n_slots_sync number of slots in the sync aio array -@return true if the AIO sub-system was started successfully */ -bool -AIO::start( - ulint n_per_seg, - ulint n_readers, - ulint n_writers, - ulint n_slots_sync) +@return: true if supported, false otherwise. */ +#include <libaio.h> +static bool is_linux_native_aio_supported() { -#if defined(LINUX_NATIVE_AIO) - /* Check if native aio is supported on this system and tmpfs */ - if (srv_use_native_aio && !is_linux_native_aio_supported()) { - - ib::warn() << "Linux Native AIO disabled."; - - srv_use_native_aio = FALSE; - } -#endif /* LINUX_NATIVE_AIO */ + File fd; + io_context_t io_ctx; + char name[1000]; - srv_reset_io_thread_op_info(); + memset(&io_ctx, 0, sizeof(io_ctx)); + if (io_setup(1, &io_ctx)) { - s_reads = create( - LATCH_ID_OS_AIO_READ_MUTEX, n_readers * n_per_seg, n_readers); + /* The platform does not support native aio. */ - if (s_reads == NULL) { return(false); - } - - ulint start = srv_read_only_mode ? 0 : 2; - ulint n_segs = n_readers + start; - /* 0 is the ibuf segment and 1 is the redo log segment. */ - for (ulint i = start; i < n_segs; ++i) { - ut_a(i < SRV_MAX_N_IO_THREADS); - srv_io_thread_function[i] = "read thread"; } + else if (!srv_read_only_mode) { - ulint n_segments = n_readers; - - if (!srv_read_only_mode) { - - s_ibuf = create(LATCH_ID_OS_AIO_IBUF_MUTEX, n_per_seg, 1); - - if (s_ibuf == NULL) { - return(false); - } - - ++n_segments; - - srv_io_thread_function[0] = "insert buffer thread"; + /* Now check if tmpdir supports native aio ops. */ + fd = mysql_tmpfile("ib"); - s_log = create(LATCH_ID_OS_AIO_LOG_MUTEX, n_per_seg, 1); + if (fd < 0) { + ib::warn() + << "Unable to create temp file to check" + " native AIO support."; - if (s_log == NULL) { return(false); } - - ++n_segments; - - srv_io_thread_function[1] = "log thread"; - - } else { - s_ibuf = s_log = NULL; - } - - s_writes = create( - LATCH_ID_OS_AIO_WRITE_MUTEX, n_writers * n_per_seg, n_writers); - - if (s_writes == NULL) { - return(false); } + else { -#ifdef WIN_ASYNC_IO - data_completion_port = s_writes->m_completion_port; - log_completion_port = - s_log ? s_log->m_completion_port : data_completion_port; -#endif - - n_segments += n_writers; - - for (ulint i = start + n_readers; i < n_segments; ++i) { - ut_a(i < SRV_MAX_N_IO_THREADS); - srv_io_thread_function[i] = "write thread"; - } - - ut_ad(n_segments >= static_cast<ulint>(srv_read_only_mode ? 2 : 4)); - - s_sync = create(LATCH_ID_OS_AIO_SYNC_MUTEX, n_slots_sync, 1); - - if (s_sync == NULL) { - - return(false); - } - - os_aio_n_segments = n_segments; - - os_aio_validate(); - - os_last_printout = time(NULL); - - if (srv_use_native_aio) { - return(true); - } - - os_aio_segment_wait_events = static_cast<os_event_t*>( - ut_zalloc_nokey( - n_segments * sizeof *os_aio_segment_wait_events)); - - if (os_aio_segment_wait_events == NULL) { - - return(false); - } - - for (ulint i = 0; i < n_segments; ++i) { - os_aio_segment_wait_events[i] = os_event_create(0); - } - - return(true); -} - -/** Free the AIO arrays */ -void -AIO::shutdown() -{ - UT_DELETE(s_ibuf); - s_ibuf = NULL; - - UT_DELETE(s_log); - s_log = NULL; - - UT_DELETE(s_writes); - s_writes = NULL; - - UT_DELETE(s_sync); - s_sync = NULL; - - UT_DELETE(s_reads); - s_reads = NULL; -} + os_normalize_path(srv_log_group_home_dir); -/** Initializes the asynchronous io system. Creates one array each for ibuf -and log i/o. Also creates one array each for read and write where each -array is divided logically into n_readers and n_writers -respectively. The caller must create an i/o handler thread for each -segment in these arrays. This function also creates the sync array. -No i/o handler thread needs to be created for that -@param[in] n_readers number of reader threads -@param[in] n_writers number of writer threads -@param[in] n_slots_sync number of slots in the sync aio array */ -bool -os_aio_init( - ulint n_readers, - ulint n_writers, - ulint n_slots_sync) -{ - /* Maximum number of pending aio operations allowed per segment */ - ulint limit = 8 * OS_AIO_N_PENDING_IOS_PER_THREAD; + ulint dirnamelen = strlen(srv_log_group_home_dir); - return(AIO::start(limit, n_readers, n_writers, n_slots_sync)); -} + ut_a(dirnamelen < (sizeof name) - 10 - sizeof "ib_logfile"); -/** Frees the asynchronous io system. */ -void -os_aio_free() -{ - AIO::shutdown(); + memcpy(name, srv_log_group_home_dir, dirnamelen); - ut_ad(!os_aio_segment_wait_events || !srv_use_native_aio); - ut_ad(srv_use_native_aio || os_aio_segment_wait_events - || !srv_was_started); + /* Add a path separator if needed. */ + if (dirnamelen && name[dirnamelen - 1] != OS_PATH_SEPARATOR) { - if (!srv_use_native_aio && os_aio_segment_wait_events) { - for (ulint i = 0; i < os_aio_n_segments; i++) { - os_event_destroy(os_aio_segment_wait_events[i]); + name[dirnamelen++] = OS_PATH_SEPARATOR; } - ut_free(os_aio_segment_wait_events); - os_aio_segment_wait_events = 0; - } - os_aio_n_segments = 0; -} - -/** Wakes up all async i/o threads so that they know to exit themselves in -shutdown. */ -void -os_aio_wake_all_threads_at_shutdown() -{ -#ifdef WIN_ASYNC_IO - AIO::wake_at_shutdown(); -#elif defined(LINUX_NATIVE_AIO) - /* When using native AIO interface the io helper threads - wait on io_getevents with a timeout value of 500ms. At - each wake up these threads check the server status. - No need to do anything to wake them up. */ -#endif /* !WIN_ASYNC_AIO */ - - if (srv_use_native_aio) { - return; - } - - /* This loop wakes up all simulated ai/o threads */ - - for (ulint i = 0; i < os_aio_n_segments; ++i) { - - os_event_set(os_aio_segment_wait_events[i]); - } -} - -/** Waits until there are no pending writes in AIO::s_writes. There can -be other, synchronous, pending writes. */ -void -os_aio_wait_until_no_pending_writes() -{ - AIO::wait_until_no_pending_writes(); -} - -/** Calculates segment number for a slot. -@param[in] array AIO wait array -@param[in] slot slot in this array -@return segment number (which is the number used by, for example, - I/O-handler threads) */ -ulint -AIO::get_segment_no_from_slot( - const AIO* array, - const Slot* slot) -{ - ulint segment; - ulint seg_len; - - if (array == s_ibuf) { - ut_ad(!srv_read_only_mode); - - segment = IO_IBUF_SEGMENT; - - } else if (array == s_log) { - ut_ad(!srv_read_only_mode); - - segment = IO_LOG_SEGMENT; - - } else if (array == s_reads) { - seg_len = s_reads->slots_per_segment(); - - segment = (srv_read_only_mode ? 0 : 2) + slot->pos / seg_len; - } else { - ut_a(array == s_writes); - - seg_len = s_writes->slots_per_segment(); - - segment = s_reads->m_n_segments - + (srv_read_only_mode ? 0 : 2) + slot->pos / seg_len; - } - - return(segment); -} - -/** Requests for a slot in the aio array. If no slot is available, waits until -not_full-event becomes signaled. - -@param[in] type IO context -@param[in,out] m1 message to be passed along with the AIO - operation -@param[in,out] m2 message to be passed along with the AIO - operation -@param[in] file file handle -@param[in] name name of the file or path as a NUL-terminated - string -@param[in,out] buf buffer where to read or from which to write -@param[in] offset file offset, where to read from or start writing -@param[in] len length of the block to read or write -@return pointer to slot */ -Slot* -AIO::reserve_slot( - const IORequest& type, - fil_node_t* m1, - void* m2, - pfs_os_file_t file, - const char* name, - void* buf, - os_offset_t offset, - ulint len) -{ -#ifdef WIN_ASYNC_IO - ut_a((len & 0xFFFFFFFFUL) == len); -#endif /* WIN_ASYNC_IO */ - - /* No need of a mutex. Only reading constant fields */ - ulint slots_per_seg; - - ut_ad(type.validate()); - - slots_per_seg = slots_per_segment(); - - /* We attempt to keep adjacent blocks in the same local - segment. This can help in merging IO requests when we are - doing simulated AIO */ - ulint local_seg; - - local_seg = (offset >> (srv_page_size_shift + 6)) % m_n_segments; - - for (;;) { - - acquire(); + strcpy(name + dirnamelen, "ib_logfile0"); - if (m_n_reserved != m_slots.size()) { - break; - } + fd = my_open(name, O_RDONLY | O_CLOEXEC, MYF(0)); - release(); + if (fd == -1) { - if (!srv_use_native_aio) { - /* If the handler threads are suspended, - wake them so that we get more slots */ + ib::warn() + << "Unable to open" + << " \"" << name << "\" to check native" + << " AIO read support."; - os_aio_simulated_wake_handler_threads(); + return(false); } - - os_event_wait(m_not_full); } - ulint counter = 0; - Slot* slot = NULL; + struct io_event io_event; - /* We start our search for an available slot from our preferred - local segment and do a full scan of the array. We are - guaranteed to find a slot in full scan. */ - for (ulint i = local_seg * slots_per_seg; - counter < m_slots.size(); - ++i, ++counter) { + memset(&io_event, 0x0, sizeof(io_event)); - i %= m_slots.size(); + byte* buf = static_cast<byte*>(ut_malloc_nokey(srv_page_size * 2)); + byte* ptr = static_cast<byte*>(ut_align(buf, srv_page_size)); - slot = at(i); + struct iocb iocb; - if (slot->is_reserved == false) { - break; - } - } + /* Suppress valgrind warning. */ + memset(buf, 0x00, srv_page_size * 2); + memset(&iocb, 0x0, sizeof(iocb)); - /* We MUST always be able to get hold of a reserved slot. */ - ut_a(counter < m_slots.size()); + struct iocb* p_iocb = &iocb; - ut_a(slot->is_reserved == false); + if (!srv_read_only_mode) { - ++m_n_reserved; + io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0); - if (m_n_reserved == 1) { - os_event_reset(m_is_empty); } - - if (m_n_reserved == m_slots.size()) { - os_event_reset(m_not_full); + else { + ut_a(srv_page_size >= 512); + io_prep_pread(p_iocb, fd, ptr, 512, 0); } - slot->is_reserved = true; - slot->reservation_time = time(NULL); - slot->m1 = m1; - slot->m2 = m2; - slot->file = file; - slot->name = name; -#ifdef _WIN32 - slot->len = static_cast<DWORD>(len); -#else - slot->len = len; -#endif /* _WIN32 */ - slot->type = type; - slot->buf = static_cast<byte*>(buf); - slot->ptr = slot->buf; - slot->offset = offset; - slot->err = DB_SUCCESS; - slot->original_len = static_cast<uint32>(len); - slot->io_already_done = false; - slot->buf = static_cast<byte*>(buf); - -#ifdef WIN_ASYNC_IO - { - OVERLAPPED* control; + int err = io_submit(io_ctx, 1, &p_iocb); + srv_stats.buffered_aio_submitted.inc(); - control = &slot->control; - control->Offset = (DWORD) offset & 0xFFFFFFFF; - control->OffsetHigh = (DWORD) (offset >> 32); + if (err >= 1) { + /* Now collect the submitted IO request. */ + err = io_getevents(io_ctx, 1, 1, &io_event, NULL); } -#elif defined(LINUX_NATIVE_AIO) - - /* If we are not using native AIO skip this part. */ - if (srv_use_native_aio) { - - off_t aio_offset; - - /* Check if we are dealing with 64 bit arch. - If not then make sure that offset fits in 32 bits. */ - aio_offset = (off_t) offset; - - ut_a(sizeof(aio_offset) >= sizeof(offset) - || ((os_offset_t) aio_offset) == offset); - - struct iocb* iocb = &slot->control; - if (type.is_read()) { - - io_prep_pread( - iocb, file, slot->ptr, slot->len, aio_offset); - } else { - ut_ad(type.is_write()); + ut_free(buf); + my_close(fd, MYF(MY_WME)); - io_prep_pwrite( - iocb, file, slot->ptr, slot->len, aio_offset); - } + switch (err) { + case 1: + return(true); - iocb->data = slot; + case -EINVAL: + case -ENOSYS: + ib::error() + << "Linux Native AIO not supported. You can either" + " move " + << (srv_read_only_mode ? name : "tmpdir") + << " to a file system that supports native" + " AIO or you can set innodb_use_native_aio to" + " FALSE to avoid this message."; - slot->n_bytes = 0; - slot->ret = 0; + /* fall through. */ + default: + ib::error() + << "Linux Native AIO check on " + << (srv_read_only_mode ? name : "tmpdir") + << "returned error[" << -err << "]"; } -#endif /* LINUX_NATIVE_AIO */ - - release(); - return(slot); + return(false); } +#endif -/** Wakes up a simulated aio i/o-handler thread if it has something to do. -@param[in] global_segment The number of the segment in the AIO arrays */ -void -AIO::wake_simulated_handler_thread(ulint global_segment) -{ - ut_ad(!srv_use_native_aio); - - AIO* array; - ulint segment = get_array_and_local_segment(&array, global_segment); - array->wake_simulated_handler_thread(global_segment, segment); -} -/** Wakes up a simulated AIO I/O-handler thread if it has something to do -for a local segment in the AIO array. -@param[in] global_segment The number of the segment in the AIO arrays -@param[in] segment The local segment in the AIO array */ -void -AIO::wake_simulated_handler_thread(ulint global_segment, ulint segment) +bool os_aio_init(ulint n_reader_threads, ulint n_writer_threads, ulint) { - ut_ad(!srv_use_native_aio); - - ulint n = slots_per_segment(); - ulint offset = segment * n; + int max_write_events = (int)n_writer_threads * OS_AIO_N_PENDING_IOS_PER_THREAD; + int max_read_events = (int)n_reader_threads * OS_AIO_N_PENDING_IOS_PER_THREAD; + int max_ibuf_events = 1 * OS_AIO_N_PENDING_IOS_PER_THREAD; + int max_events = max_read_events + max_write_events + max_ibuf_events; + int ret; - /* Look through n slots after the segment * n'th slot */ - - acquire(); - - const Slot* slot = at(offset); - - for (ulint i = 0; i < n; ++i, ++slot) { - - if (slot->is_reserved) { - - /* Found an i/o request */ - - release(); - - os_event_t event; - - event = os_aio_segment_wait_events[global_segment]; - - os_event_set(event); - - return; - } +#if LINUX_NATIVE_AIO + if (srv_use_native_aio && !is_linux_native_aio_supported()) + srv_use_native_aio = false; +#endif + ret = srv_thread_pool->configure_aio(srv_use_native_aio, max_events); + if(ret) { + ut_a(srv_use_native_aio); + srv_use_native_aio = false; +#ifdef LINUX_NATIVE_AIO + ib::info() << "Linux native AIO disabled"; +#endif + ret = srv_thread_pool->configure_aio(srv_use_native_aio, max_events); + DBUG_ASSERT(!ret); } - - release(); + read_slots = new io_slots(max_read_events, (uint)n_reader_threads); + write_slots = new io_slots(max_write_events, (uint)n_writer_threads); + ibuf_slots = new io_slots(max_ibuf_events, 1); + return true; } -/** Wakes up simulated aio i/o-handler threads if they have something to do. */ -void -os_aio_simulated_wake_handler_threads() +void os_aio_free(void) { - if (srv_use_native_aio) { - /* We do not use simulated aio: do nothing */ - - return; - } - - os_aio_recommend_sleep_for_read_threads = false; - - for (ulint i = 0; i < os_aio_n_segments; i++) { - AIO::wake_simulated_handler_thread(i); - } + srv_thread_pool->disable_aio(); + delete read_slots; + delete write_slots; + delete ibuf_slots; } -/** Select the IO slot array -@param[in,out] type Type of IO, READ or WRITE -@param[in] read_only true if running in read-only mode -@param[in] mode IO mode -@return slot array or NULL if invalid mode specified */ -AIO* -AIO::select_slot_array(IORequest& type, bool read_only, ulint mode) +/** Waits until there are no pending writes. There can +be other, synchronous, pending writes. */ +void +os_aio_wait_until_no_pending_writes() { - AIO* array; - - ut_ad(type.validate()); - - switch (mode) { - case OS_AIO_NORMAL: - - array = type.is_read() ? AIO::s_reads : AIO::s_writes; - break; - - case OS_AIO_IBUF: - ut_ad(type.is_read()); - - /* Reduce probability of deadlock bugs in connection with ibuf: - do not let the ibuf i/o handler sleep */ - - type.clear_do_not_wake(); - - array = read_only ? AIO::s_reads : AIO::s_ibuf; - break; - - case OS_AIO_LOG: - - array = read_only ? AIO::s_reads : AIO::s_log; - break; - - case OS_AIO_SYNC: - - array = AIO::s_sync; -#if defined(LINUX_NATIVE_AIO) - /* In Linux native AIO we don't use sync IO array. */ - ut_a(!srv_use_native_aio); -#endif /* LINUX_NATIVE_AIO */ - break; - - default: - ut_error; - array = NULL; /* Eliminate compiler warning */ - } - - return(array); + write_slots->wait(); } -#ifdef WIN_ASYNC_IO -/** This function is only used in Windows asynchronous i/o. -Waits for an aio operation to complete. This function is used to wait the -for completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment The number of the segment in the aio arrays to - wait for; segment 0 is the ibuf I/O thread, - segment 1 the log I/O thread, then follow the - non-ibuf read threads, and as the last are the - non-ibuf write threads; if this is - ULINT_UNDEFINED, then it means that sync AIO - is used, and this parameter is ignored -@param[in] pos this parameter is used only in sync AIO: - wait for the aio slot at this position -@param[out] m1 the messages passed with the AIO request; note - that also in the case where the AIO operation - failed, these output parameters are valid and - can be used to restart the operation, - for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ - - - -static -dberr_t -os_aio_windows_handler( - ulint segment, - ulint pos, - fil_node_t** m1, - void** m2, - IORequest* type) -{ - Slot* slot= 0; - dberr_t err; - - BOOL ret; - ULONG_PTR key; - - ut_a(segment != ULINT_UNDEFINED); - - /* NOTE! We only access constant fields in os_aio_array. Therefore - we do not have to acquire the protecting mutex yet */ - - ut_ad(os_aio_validate_skip()); - AIO *my_array; - AIO::get_array_and_local_segment(&my_array, segment); - - HANDLE port = my_array->m_completion_port; - ut_ad(port); - for (;;) { - DWORD len; - ret = GetQueuedCompletionStatus(port, &len, &key, - (OVERLAPPED **)&slot, INFINITE); - - /* If shutdown key was received, repost the shutdown message and exit */ - if (ret && key == IOCP_SHUTDOWN_KEY) { - PostQueuedCompletionStatus(port, 0, key, NULL); - *m1 = NULL; - *m2 = NULL; - return (DB_SUCCESS); - } - - ut_a(slot); - - if (!ret) { - /* IO failed */ - break; - } - - slot->n_bytes= len; - ut_a(slot->array); - HANDLE slot_port = slot->array->m_completion_port; - if (slot_port != port) { - /* there are no redirections between data and log */ - ut_ad(port == data_completion_port); - ut_ad(slot_port != log_completion_port); - - /* - Redirect completions to the dedicated completion port - and threads. - - "Write array" threads receive write,read and ibuf - notifications, read and ibuf completions are redirected. - - Forwarding IO completion this way costs a context switch, - and this seems tolerable since asynchronous reads are by - far less frequent. - */ - ut_a(PostQueuedCompletionStatus(slot_port, - len, key, &slot->control)); - } - else { - break; - } - } - - ut_a(slot->is_reserved); - - *m1 = slot->m1; - *m2 = slot->m2; - - *type = slot->type; - - bool retry = false; - - if (ret && slot->n_bytes == slot->len) { - - err = DB_SUCCESS; - - } else if (os_file_handle_error(slot->name, "Windows aio")) { - - retry = true; - - } else { - - err = DB_IO_ERROR; - } - - - if (retry) { - /* Retry failed read/write operation synchronously. */ - -#ifdef UNIV_PFS_IO - /* This read/write does not go through os_file_read - and os_file_write APIs, need to register with - performance schema explicitly here. */ - PSI_file_locker_state state; - struct PSI_file_locker* locker = NULL; - - register_pfs_file_io_begin( - &state, locker, slot->file, slot->len, - slot->type.is_write() - ? PSI_FILE_WRITE : PSI_FILE_READ, __FILE__, __LINE__); -#endif /* UNIV_PFS_IO */ - - ut_a((slot->len & 0xFFFFFFFFUL) == slot->len); - - ssize_t n_bytes = SyncFileIO::execute(slot); - -#ifdef UNIV_PFS_IO - register_pfs_file_io_end(locker, slot->len); -#endif /* UNIV_PFS_IO */ - err = (n_bytes == slot->len) ? DB_SUCCESS : DB_IO_ERROR; - } - - if (err == DB_SUCCESS) { - err = AIOHandler::post_io_processing(slot); - } - - slot->array->release_with_mutex(slot); - - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - && !buf_page_cleaner_is_active - && os_aio_all_slots_free()) { - /* Last IO, wakeup other io threads */ - AIO::wake_at_shutdown(); - } - return(err); -} -#endif /* WIN_ASYNC_IO */ /** NOTE! Use the corresponding macro os_aio(), not directly this function! @@ -6485,14 +4127,10 @@ os_aio_func( fil_node_t* m1, void* m2) { -#ifdef WIN_ASYNC_IO - BOOL ret = TRUE; -#endif /* WIN_ASYNC_IO */ ut_ad(n > 0); ut_ad((n % OS_FILE_LOG_BLOCK_SIZE) == 0); ut_ad((offset % OS_FILE_LOG_BLOCK_SIZE) == 0); - ut_ad(os_aio_validate_skip()); #ifdef WIN_ASYNC_IO ut_ad((n & 0xFFFFFFFFUL) == n); @@ -6511,777 +4149,43 @@ os_aio_func( return(os_file_write_func(type, name, file, buf, offset, n)); } -try_again: - - AIO* array; - - array = AIO::select_slot_array(type, read_only, mode); - - Slot* slot; - - slot = array->reserve_slot(type, m1, m2, file, name, buf, offset, n); - if (type.is_read()) { - - - if (srv_use_native_aio) { - ++os_n_file_reads; - - os_bytes_read_since_printout += n; -#ifdef WIN_ASYNC_IO - ret = ReadFile( - file, slot->ptr, slot->len, - NULL, &slot->control); -#elif defined(LINUX_NATIVE_AIO) - if (!array->linux_dispatch(slot)) { - goto err_exit; - } -#endif /* WIN_ASYNC_IO */ - } else if (type.is_wake()) { - AIO::wake_simulated_handler_thread( - AIO::get_segment_no_from_slot(array, slot)); - } } else if (type.is_write()) { - - if (srv_use_native_aio) { ++os_n_file_writes; - -#ifdef WIN_ASYNC_IO - ret = WriteFile( - file, slot->ptr, slot->len, - NULL, &slot->control); -#elif defined(LINUX_NATIVE_AIO) - if (!array->linux_dispatch(slot)) { - goto err_exit; - } -#endif /* WIN_ASYNC_IO */ - - } else if (type.is_wake()) { - AIO::wake_simulated_handler_thread( - AIO::get_segment_no_from_slot(array, slot)); - } } else { ut_error; } -#ifdef WIN_ASYNC_IO - if (ret || (GetLastError() == ERROR_IO_PENDING)) { - /* aio completed or was queued successfully! */ - return(DB_SUCCESS); + compile_time_assert(sizeof(os_aio_userdata_t) <= tpool::MAX_AIO_USERDATA_LEN); + os_aio_userdata_t userdata{m1,type,m2}; + io_slots* slots; + if (type.is_read()) { + slots = mode == OS_AIO_IBUF?ibuf_slots: read_slots; + } else { + slots = write_slots; } + tpool::aiocb* cb = slots->acquire(); - goto err_exit; + cb->m_buffer = buf; + cb->m_callback = (tpool::callback_func)io_callback; + cb->m_group = slots->get_task_group(); + cb->m_fh = file.m_file; + cb->m_len = (int)n; + cb->m_offset = offset; + cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE; + memcpy(cb->m_userdata, &userdata, sizeof(userdata)); -#endif /* WIN_ASYNC_IO */ - - /* AIO request was queued successfully! */ - return(DB_SUCCESS); + if (!srv_thread_pool->submit_io(cb)) + return DB_SUCCESS; -#if defined LINUX_NATIVE_AIO || defined WIN_ASYNC_IO -err_exit: -#endif /* LINUX_NATIVE_AIO || WIN_ASYNC_IO */ + slots->release(cb); - array->release_with_mutex(slot); - - if (os_file_handle_error( - name, type.is_read() ? "aio read" : "aio write")) { - - goto try_again; - } + os_file_handle_error(name, type.is_read() ? "aio read" : "aio write"); return(DB_IO_ERROR); } -/** Simulated AIO handler for reaping IO requests */ -class SimulatedAIOHandler { - -public: - - /** Constructor - @param[in,out] array The AIO array - @param[in] segment Local segment in the array */ - SimulatedAIOHandler(AIO* array, ulint segment) - : - m_oldest(), - m_n_elems(), - m_lowest_offset(IB_UINT64_MAX), - m_array(array), - m_n_slots(), - m_segment(segment), - m_ptr(), - m_buf() - { - ut_ad(m_segment < 100); - - m_slots.resize(OS_AIO_MERGE_N_CONSECUTIVE); - } - - /** Destructor */ - ~SimulatedAIOHandler() - { - if (m_ptr != NULL) { - ut_free(m_ptr); - } - } - - /** Reset the state of the handler - @param[in] n_slots Number of pending AIO operations supported */ - void init(ulint n_slots) - { - m_oldest = 0; - m_n_elems = 0; - m_n_slots = n_slots; - m_lowest_offset = IB_UINT64_MAX; - - if (m_ptr != NULL) { - ut_free(m_ptr); - m_ptr = m_buf = NULL; - } - - m_slots[0] = NULL; - } - - /** Check if there is a slot for which the i/o has already been done - @param[out] n_reserved Number of reserved slots - @return the first completed slot that is found. */ - Slot* check_completed(ulint* n_reserved) - { - ulint offset = m_segment * m_n_slots; - - *n_reserved = 0; - - Slot* slot; - - slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved) { - - if (slot->io_already_done) { - - ut_a(slot->is_reserved); - - return(slot); - } - - ++*n_reserved; - } - } - - return(NULL); - } - - /** If there are at least 2 seconds old requests, then pick the - oldest one to prevent starvation. If several requests have the - same age, then pick the one at the lowest offset. - @return true if request was selected */ - bool select() - { - if (!select_oldest()) { - - return(select_lowest_offset()); - } - - return(true); - } - - /** Check if there are several consecutive blocks - to read or write. Merge them if found. */ - void merge() - { - /* if m_n_elems != 0, then we have assigned - something valid to consecutive_ios[0] */ - ut_ad(m_n_elems != 0); - ut_ad(first_slot() != NULL); - - Slot* slot = first_slot(); - - while (!merge_adjacent(slot)) { - /* No op */ - } - } - - /** We have now collected n_consecutive I/O requests - in the array; allocate a single buffer which can hold - all data, and perform the I/O - @return the length of the buffer */ - ulint allocate_buffer() - MY_ATTRIBUTE((warn_unused_result)) - { - ulint len; - Slot* slot = first_slot(); - - ut_ad(m_ptr == NULL); - - if (slot->type.is_read() && m_n_elems > 1) { - - len = 0; - - for (ulint i = 0; i < m_n_elems; ++i) { - len += m_slots[i]->len; - } - - m_ptr = static_cast<byte*>( - ut_malloc_nokey(len + srv_page_size)); - - m_buf = static_cast<byte*>( - ut_align(m_ptr, srv_page_size)); - - } else { - len = first_slot()->len; - m_buf = first_slot()->buf; - } - - return(len); - } - - /** We have to compress the individual pages and punch - holes in them on a page by page basis when writing to - tables that can be compresed at the IO level. - @param[in] len Value returned by allocate_buffer */ - void copy_to_buffer(ulint len) - { - Slot* slot = first_slot(); - - if (len > slot->len && slot->type.is_write()) { - - byte* ptr = m_buf; - - ut_ad(ptr != slot->buf); - - /* Copy the buffers to the combined buffer */ - for (ulint i = 0; i < m_n_elems; ++i) { - - slot = m_slots[i]; - - memmove(ptr, slot->buf, slot->len); - - ptr += slot->len; - } - } - } - - /** Do the I/O with ordinary, synchronous i/o functions: - @param[in] len Length of buffer for IO */ - void io() - { - if (first_slot()->type.is_write()) { - - for (ulint i = 0; i < m_n_elems; ++i) { - write(m_slots[i]); - } - - } else { - - for (ulint i = 0; i < m_n_elems; ++i) { - read(m_slots[i]); - } - } - } - - /** Mark the i/os done in slots */ - void done() - { - for (ulint i = 0; i < m_n_elems; ++i) { - m_slots[i]->io_already_done = true; - } - } - - /** @return the first slot in the consecutive array */ - Slot* first_slot() - MY_ATTRIBUTE((warn_unused_result)) - { - ut_a(m_n_elems > 0); - - return(m_slots[0]); - } - - /** Wait for I/O requests - @param[in] global_segment The global segment - @param[in,out] event Wait on event if no active requests - @return the number of slots */ - ulint check_pending( - ulint global_segment, - os_event_t event) - MY_ATTRIBUTE((warn_unused_result)); -private: - - /** Do the file read - @param[in,out] slot Slot that has the IO context */ - void read(Slot* slot) - { - dberr_t err = os_file_read( - slot->type, - slot->file, - slot->ptr, - slot->offset, - slot->len); - - ut_a(err == DB_SUCCESS); - } - - /** Do the file read - @param[in,out] slot Slot that has the IO context */ - void write(Slot* slot) - { - dberr_t err = os_file_write( - slot->type, - slot->name, - slot->file, - slot->ptr, - slot->offset, - slot->len); - - ut_a(err == DB_SUCCESS); - } - - /** @return true if the slots are adjacent and can be merged */ - bool adjacent(const Slot* s1, const Slot* s2) const - { - return(s1 != s2 - && s1->file == s2->file - && s2->offset == s1->offset + s1->len - && s1->type == s2->type); - } - - /** @return true if merge limit reached or no adjacent slots found. */ - bool merge_adjacent(Slot*& current) - { - Slot* slot; - ulint offset = m_segment * m_n_slots; - - slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved && adjacent(current, slot)) { - - current = slot; - - /* Found a consecutive i/o request */ - - m_slots[m_n_elems] = slot; - - ++m_n_elems; - - return(m_n_elems >= m_slots.capacity()); - } - } - - return(true); - } - - /** There were no old requests. Look for an I/O request at the lowest - offset in the array (we ignore the high 32 bits of the offset in these - heuristics) */ - bool select_lowest_offset() - { - ut_ad(m_n_elems == 0); - - ulint offset = m_segment * m_n_slots; - - m_lowest_offset = IB_UINT64_MAX; - - for (ulint i = 0; i < m_n_slots; ++i) { - Slot* slot; - - slot = m_array->at(i + offset); - - if (slot->is_reserved - && slot->offset < m_lowest_offset) { - - /* Found an i/o request */ - m_slots[0] = slot; - - m_n_elems = 1; - - m_lowest_offset = slot->offset; - } - } - - return(m_n_elems > 0); - } - - /** Select the slot if it is older than the current oldest slot. - @param[in] slot The slot to check */ - void select_if_older(Slot* slot) - { - ulint age; - - age = (ulint) difftime(time(NULL), slot->reservation_time); - - if ((age >= 2 && age > m_oldest) - || (age >= 2 - && age == m_oldest - && slot->offset < m_lowest_offset)) { - - /* Found an i/o request */ - m_slots[0] = slot; - - m_n_elems = 1; - - m_oldest = age; - - m_lowest_offset = slot->offset; - } - } - - /** Select th oldest slot in the array - @return true if oldest slot found */ - bool select_oldest() - { - ut_ad(m_n_elems == 0); - - Slot* slot; - ulint offset = m_n_slots * m_segment; - - slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved) { - select_if_older(slot); - } - } - - return(m_n_elems > 0); - } - - typedef std::vector<Slot*> slots_t; - -private: - ulint m_oldest; - ulint m_n_elems; - os_offset_t m_lowest_offset; - - AIO* m_array; - ulint m_n_slots; - ulint m_segment; - - slots_t m_slots; - - byte* m_ptr; - byte* m_buf; -}; - -/** Wait for I/O requests -@return the number of slots */ -ulint -SimulatedAIOHandler::check_pending( - ulint global_segment, - os_event_t event) -{ - /* NOTE! We only access constant fields in os_aio_array. - Therefore we do not have to acquire the protecting mutex yet */ - - ut_ad(os_aio_validate_skip()); - - ut_ad(m_segment < m_array->get_n_segments()); - - /* Look through n slots after the segment * n'th slot */ - - if (AIO::is_read(m_array) - && os_aio_recommend_sleep_for_read_threads) { - - /* Give other threads chance to add several - I/Os to the array at once. */ - - srv_set_io_thread_op_info( - global_segment, "waiting for i/o request"); - - os_event_wait(event); - - return(0); - } - - return(m_array->slots_per_segment()); -} - -/** Does simulated AIO. This function should be called by an i/o-handler -thread. - -@param[in] segment The number of the segment in the aio arrays to wait - for; segment 0 is the ibuf i/o thread, segment 1 the - log i/o thread, then follow the non-ibuf read threads, - and as the last are the non-ibuf write threads -@param[out] m1 the messages passed with the AIO request; note that - also in the case where the AIO operation failed, these - output parameters are valid and can be used to restart - the operation, for example -@param[out] m2 Callback argument -@param[in] type IO context -@return DB_SUCCESS or error code */ -static -dberr_t -os_aio_simulated_handler( - ulint global_segment, - fil_node_t** m1, - void** m2, - IORequest* type) -{ - Slot* slot; - AIO* array; - ulint segment; - os_event_t event = os_aio_segment_wait_events[global_segment]; - - segment = AIO::get_array_and_local_segment(&array, global_segment); - - SimulatedAIOHandler handler(array, segment); - - for (;;) { - - srv_set_io_thread_op_info( - global_segment, "looking for i/o requests (a)"); - - ulint n_slots = handler.check_pending(global_segment, event); - - if (n_slots == 0) { - continue; - } - - handler.init(n_slots); - - srv_set_io_thread_op_info( - global_segment, "looking for i/o requests (b)"); - - array->acquire(); - - ulint n_reserved; - - slot = handler.check_completed(&n_reserved); - - if (slot != NULL) { - - break; - - } else if (n_reserved == 0 - && !buf_page_cleaner_is_active - && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { - - /* There is no completed request. If there - are no pending request at all, and the system - is being shut down, exit. */ - - array->release(); - - *m1 = NULL; - - *m2 = NULL; - - return(DB_SUCCESS); - - } else if (handler.select()) { - - break; - } - - /* No I/O requested at the moment */ - - srv_set_io_thread_op_info( - global_segment, "resetting wait event"); - - /* We wait here until tbere are more IO requests - for this segment. */ - - os_event_reset(event); - - array->release(); - - srv_set_io_thread_op_info( - global_segment, "waiting for i/o request"); - - os_event_wait(event); - } - - /** Found a slot that has already completed its IO */ - - if (slot == NULL) { - /* Merge adjacent requests */ - handler.merge(); - - /* Check if there are several consecutive blocks - to read or write */ - - srv_set_io_thread_op_info( - global_segment, "consecutive i/o requests"); - - // Note: We don't support write combining for simulated AIO. - //ulint total_len = handler.allocate_buffer(); - - /* We release the array mutex for the time of the I/O: NOTE that - this assumes that there is just one i/o-handler thread serving - a single segment of slots! */ - - array->release(); - - // Note: We don't support write combining for simulated AIO. - //handler.copy_to_buffer(total_len); - - srv_set_io_thread_op_info(global_segment, "doing file i/o"); - - handler.io(); - - srv_set_io_thread_op_info(global_segment, "file i/o done"); - - array->acquire(); - - handler.done(); - - /* We return the messages for the first slot now, and if there - were several slots, the messages will be returned with - subsequent calls of this function */ - - slot = handler.first_slot(); - } - - ut_ad(slot->is_reserved); - - *m1 = slot->m1; - *m2 = slot->m2; - - *type = slot->type; - - array->release(slot); - - array->release(); - - return(DB_SUCCESS); -} - -/** Get the total number of pending IOs -@return the total number of pending IOs */ -ulint -AIO::total_pending_io_count() -{ - ulint count = s_reads->pending_io_count(); - - if (s_writes != NULL) { - count += s_writes->pending_io_count(); - } - - if (s_ibuf != NULL) { - count += s_ibuf->pending_io_count(); - } - - if (s_log != NULL) { - count += s_log->pending_io_count(); - } - - if (s_sync != NULL) { - count += s_sync->pending_io_count(); - } - - return(count); -} - -/** Validates the consistency the aio system. -@return true if ok */ -static -bool -os_aio_validate() -{ - /* The methods countds and validates, we ignore the count. */ - AIO::total_pending_io_count(); - - return(true); -} - -/** Prints pending IO requests per segment of an aio array. -We probably don't need per segment statistics but they can help us -during development phase to see if the IO requests are being -distributed as expected. -@param[in,out] file File where to print -@param[in] segments Pending IO array */ -void -AIO::print_segment_info( - FILE* file, - const ulint* segments) -{ - ut_ad(m_n_segments > 0); - - if (m_n_segments > 1) { - - fprintf(file, " ["); - - for (ulint i = 0; i < m_n_segments; ++i, ++segments) { - - if (i != 0) { - fprintf(file, ", "); - } - - fprintf(file, ULINTPF, *segments); - } - - fprintf(file, "] "); - } -} - -/** Prints info about the aio array. -@param[in,out] file Where to print */ -void -AIO::print(FILE* file) -{ - ulint count = 0; - ulint n_res_seg[SRV_MAX_N_IO_THREADS]; - - mutex_enter(&m_mutex); - - ut_a(!m_slots.empty()); - ut_a(m_n_segments > 0); - - memset(n_res_seg, 0x0, sizeof(n_res_seg)); - - for (ulint i = 0; i < m_slots.size(); ++i) { - Slot& slot = m_slots[i]; - ulint segment = (i * m_n_segments) / m_slots.size(); - - if (slot.is_reserved) { - - ++count; - - ++n_res_seg[segment]; - - ut_a(slot.len > 0); - } - } - - ut_a(m_n_reserved == count); - - print_segment_info(file, n_res_seg); - - mutex_exit(&m_mutex); -} - -/** Print all the AIO segments -@param[in,out] file Where to print */ -void -AIO::print_all(FILE* file) -{ - s_reads->print(file); - - if (s_writes != NULL) { - fputs(", aio writes:", file); - s_writes->print(file); - } - - if (s_ibuf != NULL) { - fputs(",\n ibuf aio reads:", file); - s_ibuf->print(file); - } - - if (s_log != NULL) { - fputs(", log i/o's:", file); - s_log->print(file); - } - - if (s_sync != NULL) { - fputs(", sync i/o's:", file); - s_sync->print(file); - } -} - /** Prints info of the aio arrays. @param[in,out] file file where to print */ void @@ -7297,19 +4201,11 @@ os_aio_print(FILE* file) srv_io_thread_op_info[i], srv_io_thread_function[i]); -#ifndef _WIN32 - if (!srv_use_native_aio - && os_event_is_set(os_aio_segment_wait_events[i])) { - fprintf(file, " ev set"); - } -#endif /* _WIN32 */ - fprintf(file, "\n"); } fputs("Pending normal aio reads:", file); - AIO::print_all(file); putc('\n', file); current_time = time(NULL); @@ -7381,82 +4277,6 @@ os_aio_refresh_stats() os_last_printout = time(NULL); } -/** Checks that all slots in the system have been freed, that is, there are -no pending io operations. -@return true if all free */ -bool -os_aio_all_slots_free() -{ - return(AIO::total_pending_io_count() == 0); -} - -#ifdef UNIV_DEBUG -/** Prints all pending IO for the array -@param[in] file file where to print -@param[in] array array to process */ -void -AIO::to_file(FILE* file) const -{ - acquire(); - - fprintf(file, " " ULINTPF "\n", m_n_reserved); - - for (ulint i = 0; i < m_slots.size(); ++i) { - - const Slot& slot = m_slots[i]; - - if (slot.is_reserved) { - - fprintf(file, - "%s IO for %s (offset=" UINT64PF - ", size=%lu)\n", - slot.type.is_read() ? "read" : "write", - slot.name, slot.offset, (unsigned long)(slot.len)); - } - } - - release(); -} - -/** Print pending IOs for all arrays */ -void -AIO::print_to_file(FILE* file) -{ - fprintf(file, "Pending normal aio reads:"); - - s_reads->to_file(file); - - if (s_writes != NULL) { - fprintf(file, "Pending normal aio writes:"); - s_writes->to_file(file); - } - - if (s_ibuf != NULL) { - fprintf(file, "Pending ibuf aio reads:"); - s_ibuf->to_file(file); - } - - if (s_log != NULL) { - fprintf(file, "Pending log i/o's:"); - s_log->to_file(file); - } - - if (s_sync != NULL) { - fprintf(file, "Pending sync i/o's:"); - s_sync->to_file(file); - } -} - -/** Prints all pending IO -@param[in] file File where to print */ -void -os_aio_print_pending_io( - FILE* file) -{ - AIO::print_to_file(file); -} - -#endif /* UNIV_DEBUG */ /** Set the file create umask |