summaryrefslogtreecommitdiff
path: root/storage/innobase/os/os0file.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/os/os0file.cc')
-rw-r--r--storage/innobase/os/os0file.cc3634
1 files changed, 227 insertions, 3407 deletions
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index f9028122f8d..361e34549c7 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -53,6 +53,7 @@ Created 10/21/1995 Heikki Tuuri
#include "os0thread.h"
#include <vector>
+#include <tpool_structs.h>
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
@@ -78,601 +79,80 @@ Created 10/21/1995 Heikki Tuuri
#endif
-/** Insert buffer segment id */
-static const ulint IO_IBUF_SEGMENT = 0;
-
-/** Log segment id */
-static const ulint IO_LOG_SEGMENT = 1;
-
-/** Number of retries for partial I/O's */
-static const ulint NUM_RETRIES_ON_PARTIAL_IO = 10;
-
-/* This specifies the file permissions InnoDB uses when it creates files in
-Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to
-my_umask */
-
-#ifndef _WIN32
-/** Umask for creating files */
-static ulint os_innodb_umask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
-#else
-/** Umask for creating files */
-static ulint os_innodb_umask = 0;
-static HANDLE data_completion_port;
-static HANDLE log_completion_port;
-
-static DWORD fls_sync_io = FLS_OUT_OF_INDEXES;
-#define IOCP_SHUTDOWN_KEY (ULONG_PTR)-1
-#endif /* _WIN32 */
-
-/** In simulated aio, merge at most this many consecutive i/os */
-static const ulint OS_AIO_MERGE_N_CONSECUTIVE = 64;
-
-/** Flag indicating if the page_cleaner is in active state. */
-extern bool buf_page_cleaner_is_active;
-
-#ifdef WITH_INNODB_DISALLOW_WRITES
-#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event)
-#else
-#define WAIT_ALLOW_WRITES() do { } while (0)
-#endif /* WITH_INNODB_DISALLOW_WRITES */
-
-/**********************************************************************
-
-InnoDB AIO Implementation:
-=========================
-
-We support native AIO for Windows and Linux. For rest of the platforms
-we simulate AIO by special IO-threads servicing the IO-requests.
-
-Simulated AIO:
-==============
-
-On platforms where we 'simulate' AIO, the following is a rough explanation
-of the high level design.
-There are four io-threads (for ibuf, log, read, write).
-All synchronous IO requests are serviced by the calling thread using
-os_file_write/os_file_read. The Asynchronous requests are queued up
-in an array (there are four such arrays) by the calling thread.
-Later these requests are picked up by the IO-thread and are serviced
-synchronously.
-
-Windows native AIO:
-==================
-
-If srv_use_native_aio is not set then Windows follow the same
-code as simulated AIO. If the flag is set then native AIO interface
-is used. On windows, one of the limitation is that if a file is opened
-for AIO no synchronous IO can be done on it. Therefore we have an
-extra fifth array to queue up synchronous IO requests.
-There are innodb_file_io_threads helper threads. These threads work
-on the four arrays mentioned above in Simulated AIO. No thread is
-required for the sync array.
-If a synchronous IO request is made, it is first queued in the sync
-array. Then the calling thread itself waits on the request, thus
-making the call synchronous.
-If an AIO request is made the calling thread not only queues it in the
-array but also submits the requests. The helper thread then collects
-the completed IO request and calls completion routine on it.
-
-Linux native AIO:
-=================
-
-If we have libaio installed on the system and innodb_use_native_aio
-is set to true we follow the code path of native AIO, otherwise we
-do simulated AIO.
-There are innodb_file_io_threads helper threads. These threads work
-on the four arrays mentioned above in Simulated AIO.
-If a synchronous IO request is made, it is handled by calling
-os_file_write/os_file_read.
-If an AIO request is made the calling thread not only queues it in the
-array but also submits the requests. The helper thread then collects
-the completed IO request and calls completion routine on it.
-
-**********************************************************************/
-
-
-#ifdef UNIV_PFS_IO
-/* Keys to register InnoDB I/O with performance schema */
-mysql_pfs_key_t innodb_data_file_key;
-mysql_pfs_key_t innodb_log_file_key;
-mysql_pfs_key_t innodb_temp_file_key;
-#endif /* UNIV_PFS_IO */
-
-class AIO;
-
-/** The asynchronous I/O context */
-struct Slot {
-
-#ifdef WIN_ASYNC_IO
- /** Windows control block for the aio request
- must be at the very start of Slot, so we can
- cast Slot* to OVERLAPPED*
- */
- OVERLAPPED control;
-#endif
-
- /** index of the slot in the aio array */
- uint16_t pos;
-
- /** true if this slot is reserved */
- bool is_reserved;
-
- /** time when reserved */
- time_t reservation_time;
-
- /** buffer used in i/o */
- byte* buf;
-
- /** Buffer pointer used for actual IO. We advance this
- when partial IO is required and not buf */
- byte* ptr;
-
- /** OS_FILE_READ or OS_FILE_WRITE */
- IORequest type;
-
- /** file offset in bytes */
- os_offset_t offset;
-
- /** file where to read or write */
- pfs_os_file_t file;
-
- /** file name or path */
- const char* name;
-
- /** used only in simulated aio: true if the physical i/o
- already made and only the slot message needs to be passed
- to the caller of os_aio_simulated_handle */
- bool io_already_done;
-
- /*!< file block size */
- ulint file_block_size;
-
- /** The file node for which the IO is requested. */
- fil_node_t* m1;
-
- /** the requester of an aio operation and which can be used
- to identify which pending aio operation was completed */
- void* m2;
-
- /** AIO completion status */
- dberr_t err;
-
-#ifdef WIN_ASYNC_IO
-
- /** bytes written/read */
- DWORD n_bytes;
-
- /** length of the block to read or write */
- DWORD len;
-
- /** aio array containing this slot */
- AIO *array;
-#elif defined(LINUX_NATIVE_AIO)
- /** Linux control block for aio */
- struct iocb control;
-
- /** AIO return code */
- int ret;
-
- /** bytes written/read. */
- ssize_t n_bytes;
-
- /** length of the block to read or write */
- ulint len;
-#else
- /** length of the block to read or write */
- ulint len;
-
- /** bytes written/read. */
- ulint n_bytes;
-#endif /* WIN_ASYNC_IO */
-
- /** Length of the block before it was compressed */
- uint32 original_len;
-
-};
-
-/** The asynchronous i/o array structure */
-class AIO {
+/* Per-IO operation environment*/
+class io_slots
+{
+private:
+ tpool::cache<tpool::aiocb> m_cache;
+ tpool::task_group m_group;
public:
- /** Constructor
- @param[in] id Latch ID
- @param[in] n_slots Number of slots to configure
- @param[in] segments Number of segments to configure */
- AIO(latch_id_t id, ulint n_slots, ulint segments);
-
- /** Destructor */
- ~AIO();
-
- /** Initialize the instance
- @return DB_SUCCESS or error code */
- dberr_t init();
-
- /** Requests for a slot in the aio array. If no slot is available, waits
- until not_full-event becomes signaled.
-
- @param[in] type IO context
- @param[in,out] m1 message to be passed along with the AIO
- operation
- @param[in,out] m2 message to be passed along with the AIO
- operation
- @param[in] file file handle
- @param[in] name name of the file or path as a null-terminated
- string
- @param[in,out] buf buffer where to read or from which to write
- @param[in] offset file offset, where to read from or start writing
- @param[in] len length of the block to read or write
- @return pointer to slot */
- Slot* reserve_slot(
- const IORequest& type,
- fil_node_t* m1,
- void* m2,
- pfs_os_file_t file,
- const char* name,
- void* buf,
- os_offset_t offset,
- ulint len)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** @return number of reserved slots */
- ulint pending_io_count() const;
-
- /** Returns a pointer to the nth slot in the aio array.
- @param[in] index Index of the slot in the array
- @return pointer to slot */
- const Slot* at(ulint i) const
- MY_ATTRIBUTE((warn_unused_result))
+ io_slots(int max_submitted_io, int max_callback_concurrency) :
+ m_cache(max_submitted_io),
+ m_group(max_callback_concurrency)
{
- ut_a(i < m_slots.size());
-
- return(&m_slots[i]);
}
-
- /** Non const version */
- Slot* at(ulint i)
- MY_ATTRIBUTE((warn_unused_result))
+ /* Get cached AIO control block */
+ tpool::aiocb* acquire()
{
- ut_a(i < m_slots.size());
-
- return(&m_slots[i]);
+ return m_cache.get();
}
-
- /** Frees a slot in the AIO array, assumes caller owns the mutex.
- @param[in,out] slot Slot to release */
- void release(Slot* slot);
-
- /** Frees a slot in the AIO array, assumes caller doesn't own the mutex.
- @param[in,out] slot Slot to release */
- void release_with_mutex(Slot* slot);
-
- /** Prints info about the aio array.
- @param[in,out] file Where to print */
- void print(FILE* file);
-
- /** @return the number of slots per segment */
- ulint slots_per_segment() const
- MY_ATTRIBUTE((warn_unused_result))
- {
- return(m_slots.size() / m_n_segments);
- }
-
- /** @return accessor for n_segments */
- ulint get_n_segments() const
- MY_ATTRIBUTE((warn_unused_result))
+ /* Release AIO control block back to cache */
+ void release(tpool::aiocb* aiocb)
{
- return(m_n_segments);
+ m_cache.put(aiocb);
}
-#ifdef UNIV_DEBUG
- /** @return true if the thread owns the mutex */
- bool is_mutex_owned() const
- MY_ATTRIBUTE((warn_unused_result))
+ bool contains(tpool::aiocb* aiocb)
{
- return(mutex_own(&m_mutex));
+ return m_cache.contains(aiocb);
}
-#endif /* UNIV_DEBUG */
- /** Acquire the mutex */
- void acquire() const
+ /* Wait for completions of all AIO operations */
+ void wait()
{
- mutex_enter(&m_mutex);
+ m_cache.wait();
}
- /** Release the mutex */
- void release() const
- {
- mutex_exit(&m_mutex);
- }
-
- /** Write out the state to the file/stream
- @param[in, out] file File to write to */
- void to_file(FILE* file) const;
-
-#ifdef LINUX_NATIVE_AIO
- /** Dispatch an AIO request to the kernel.
- @param[in,out] slot an already reserved slot
- @return true on success. */
- bool linux_dispatch(Slot* slot)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Accessor for an AIO event
- @param[in] index Index into the array
- @return the event at the index */
- io_event* io_events(ulint index)
- MY_ATTRIBUTE((warn_unused_result))
+ tpool::task_group* get_task_group()
{
- ut_a(index < m_events.size());
-
- return(&m_events[index]);
+ return &m_group;
}
- /** Accessor for the AIO context
- @param[in] segment Segment for which to get the context
- @return the AIO context for the segment */
- io_context* io_ctx(ulint segment)
- MY_ATTRIBUTE((warn_unused_result))
+ ~io_slots()
{
- ut_ad(segment < get_n_segments());
-
- return(m_aio_ctx[segment]);
+ wait();
}
+};
- /** Creates an io_context for native linux AIO.
- @param[in] max_events number of events
- @param[out] io_ctx io_ctx to initialize.
- @return true on success. */
- static bool linux_create_io_ctx(unsigned max_events, io_context_t* io_ctx)
- MY_ATTRIBUTE((warn_unused_result));
+io_slots* read_slots;
+io_slots* write_slots;
+io_slots* ibuf_slots;
- /** Checks if the system supports native linux aio. On some kernel
- versions where native aio is supported it won't work on tmpfs. In such
- cases we can't use native aio as it is not possible to mix simulated
- and native aio.
- @return true if supported, false otherwise. */
- static bool is_linux_native_aio_supported()
- MY_ATTRIBUTE((warn_unused_result));
-#endif /* LINUX_NATIVE_AIO */
+/** Number of retries for partial I/O's */
+static const ulint NUM_RETRIES_ON_PARTIAL_IO = 10;
-#ifdef WIN_ASYNC_IO
- HANDLE m_completion_port;
- /** Wake up all AIO threads in Windows native aio */
- static void wake_at_shutdown() {
- AIO *all_arrays[] = {s_reads, s_writes, s_log, s_ibuf };
- for (size_t i = 0; i < array_elements(all_arrays); i++) {
- AIO *a = all_arrays[i];
- if (a) {
- PostQueuedCompletionStatus(a->m_completion_port, 0,
- IOCP_SHUTDOWN_KEY, 0);
- }
- }
- }
-#endif /* WIN_ASYNC_IO */
+/* This specifies the file permissions InnoDB uses when it creates files in
+Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to
+my_umask */
-#ifdef _WIN32
- /** This function can be called if one wants to post a batch of reads
- and prefers an I/O - handler thread to handle them all at once later.You
- must call os_aio_simulated_wake_handler_threads later to ensure the
- threads are not left sleeping! */
- static void simulated_put_read_threads_to_sleep();
+#ifndef _WIN32
+/** Umask for creating files */
+static ulint os_innodb_umask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+#else
+/** Umask for creating files */
+static ulint os_innodb_umask = 0;
#endif /* _WIN32 */
- /** Create an instance using new(std::nothrow)
- @param[in] id Latch ID
- @param[in] n_slots The number of AIO request slots
- @param[in] segments The number of segments
- @return a new AIO instance */
- static AIO* create(
- latch_id_t id,
- ulint n_slots,
- ulint segments)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Initializes the asynchronous io system. Creates one array each
- for ibuf and log I/O. Also creates one array each for read and write
- where each array is divided logically into n_readers and n_writers
- respectively. The caller must create an i/o handler thread for each
- segment in these arrays. This function also creates the sync array.
- No I/O handler thread needs to be created for that
- @param[in] n_per_seg maximum number of pending aio
- operations allowed per segment
- @param[in] n_readers number of reader threads
- @param[in] n_writers number of writer threads
- @param[in] n_slots_sync number of slots in the sync aio array
- @return true if AIO sub-system was started successfully */
- static bool start(
- ulint n_per_seg,
- ulint n_readers,
- ulint n_writers,
- ulint n_slots_sync)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Free the AIO arrays */
- static void shutdown();
-
- /** Print all the AIO segments
- @param[in,out] file Where to print */
- static void print_all(FILE* file);
-
- /** Calculates local segment number and aio array from global
- segment number.
- @param[out] array AIO wait array
- @param[in] segment global segment number
- @return local segment number within the aio array */
- static ulint get_array_and_local_segment(
- AIO** array,
- ulint segment)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Select the IO slot array
- @param[in,out] type Type of IO, READ or WRITE
- @param[in] read_only true if running in read-only mode
- @param[in] mode IO mode
- @return slot array or NULL if invalid mode specified */
- static AIO* select_slot_array(
- IORequest& type,
- bool read_only,
- ulint mode)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Calculates segment number for a slot.
- @param[in] array AIO wait array
- @param[in] slot slot in this array
- @return segment number (which is the number used by, for example,
- I/O handler threads) */
- static ulint get_segment_no_from_slot(
- const AIO* array,
- const Slot* slot)
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Wakes up a simulated AIO I/O-handler thread if it has something
- to do.
- @param[in] global_segment the number of the segment in the
- AIO arrays */
- static void wake_simulated_handler_thread(ulint global_segment);
-
- /** Check if it is a read request
- @param[in] aio The AIO instance to check
- @return true if the AIO instance is for reading. */
- static bool is_read(const AIO* aio)
- MY_ATTRIBUTE((warn_unused_result))
- {
- return(s_reads == aio);
- }
-
- /** Wait on an event until no pending writes */
- static void wait_until_no_pending_writes()
- {
- os_event_wait(AIO::s_writes->m_is_empty);
- }
-
- /** Print to file
- @param[in] file File to write to */
- static void print_to_file(FILE* file);
-
- /** Check for pending IO. Gets the count and also validates the
- data structures.
- @return count of pending IO requests */
- static ulint total_pending_io_count();
-
-private:
- /** Initialise the slots
- @return DB_SUCCESS or error code */
- dberr_t init_slots()
- MY_ATTRIBUTE((warn_unused_result));
-
- /** Wakes up a simulated AIO I/O-handler thread if it has something
- to do for a local segment in the AIO array.
- @param[in] global_segment the number of the segment in the
- AIO arrays
- @param[in] segment the local segment in the AIO array */
- void wake_simulated_handler_thread(ulint global_segment, ulint segment);
-
- /** Prints pending IO requests per segment of an aio array.
- We probably don't need per segment statistics but they can help us
- during development phase to see if the IO requests are being
- distributed as expected.
- @param[in,out] file file where to print
- @param[in] segments pending IO array */
- void print_segment_info(
- FILE* file,
- const ulint* segments);
-
-#ifdef LINUX_NATIVE_AIO
- /** Initialise the Linux native AIO data structures
- @return DB_SUCCESS or error code */
- dberr_t init_linux_native_aio()
- MY_ATTRIBUTE((warn_unused_result));
-#endif /* LINUX_NATIVE_AIO */
-
-private:
- typedef std::vector<Slot> Slots;
-
- /** the mutex protecting the aio array */
- mutable SysMutex m_mutex;
-
- /** Pointer to the slots in the array.
- Number of elements must be divisible by n_threads. */
- Slots m_slots;
-
- /** Number of segments in the aio array of pending aio requests.
- A thread can wait separately for any one of the segments. */
- ulint m_n_segments;
-
- /** The event which is set to the signaled state when
- there is space in the aio outside the ibuf segment;
- os_event_set() and os_event_reset() are protected by AIO::m_mutex */
- os_event_t m_not_full;
-
- /** The event which is set to the signaled state when
- there are no pending i/os in this array;
- os_event_set() and os_event_reset() are protected by AIO::m_mutex */
- os_event_t m_is_empty;
-
- /** Number of reserved slots in the AIO array outside
- the ibuf segment */
- ulint m_n_reserved;
-
-
-#if defined(LINUX_NATIVE_AIO)
- typedef std::vector<io_event> IOEvents;
-
- /** completion queue for IO. There is one such queue per
- segment. Each thread will work on one ctx exclusively. */
- io_context_t* m_aio_ctx;
-
- /** The array to collect completed IOs. There is one such
- event for each possible pending IO. The size of the array
- is equal to m_slots.size(). */
- IOEvents m_events;
-#endif /* LINUX_NATIV_AIO */
-
- /** The aio arrays for non-ibuf i/o and ibuf i/o, as well as
- sync AIO. These are NULL when the module has not yet been
- initialized. */
-
- /** Insert buffer */
- static AIO* s_ibuf;
- /** Redo log */
- static AIO* s_log;
-
- /** Reads */
- static AIO* s_reads;
-
- /** Writes */
- static AIO* s_writes;
-
- /** Synchronous I/O */
- static AIO* s_sync;
-};
-
-/** Static declarations */
-AIO* AIO::s_reads;
-AIO* AIO::s_writes;
-AIO* AIO::s_ibuf;
-AIO* AIO::s_log;
-AIO* AIO::s_sync;
-
-#if defined(LINUX_NATIVE_AIO)
-/** timeout for each io_getevents() call = 500ms. */
-static const ulint OS_AIO_REAP_TIMEOUT = 500000000UL;
-
-/** time to sleep, in microseconds if io_setup() returns EAGAIN. */
-static const ulint OS_AIO_IO_SETUP_RETRY_SLEEP = 500000UL;
-
-/** number of attempts before giving up on io_setup(). */
-static const int OS_AIO_IO_SETUP_RETRY_ATTEMPTS = 5;
-#endif /* LINUX_NATIVE_AIO */
-
-/** Array of events used in simulated AIO */
-static os_event_t* os_aio_segment_wait_events;
+/** Flag indicating if the page_cleaner is in active state. */
+extern bool buf_page_cleaner_is_active;
-/** Number of asynchronous I/O segments. Set by os_aio_init(). */
-static ulint os_aio_n_segments = ULINT_UNDEFINED;
+#ifdef WITH_INNODB_DISALLOW_WRITES
+#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event)
+#else
+#define WAIT_ALLOW_WRITES() do { } while (0)
+#endif /* WITH_INNODB_DISALLOW_WRITES */
-/** If the following is true, read i/o handler threads try to
-wait until a batch of new read requests have been posted */
-static bool os_aio_recommend_sleep_for_read_threads;
ulint os_n_file_reads;
static ulint os_bytes_read_since_printout;
@@ -688,11 +168,12 @@ bool os_has_said_disk_full;
/** Default Zip compression level */
extern uint page_zip_level;
-/** Validates the consistency of the aio system.
-@return true if ok */
-static
-bool
-os_aio_validate();
+#ifdef UNIV_PFS_IO
+/* Keys to register InnoDB I/O with performance schema */
+mysql_pfs_key_t innodb_data_file_key;
+mysql_pfs_key_t innodb_log_file_key;
+mysql_pfs_key_t innodb_temp_file_key;
+#endif
/** Handle errors for file operations.
@param[in] name name of a file or NULL
@@ -756,30 +237,8 @@ static void os_file_handle_rename_error(const char* name, const char* new_name)
}
}
-/** Does simulated AIO. This function should be called by an i/o-handler
-thread.
-
-@param[in] segment The number of the segment in the aio arrays to wait
- for; segment 0 is the ibuf i/o thread, segment 1 the
- log i/o thread, then follow the non-ibuf read threads,
- and as the last are the non-ibuf write threads
-@param[out] m1 the messages passed with the AIO request; note that
- also in the case where the AIO operation failed, these
- output parameters are valid and can be used to restart
- the operation, for example
-@param[out] m2 Callback argument
-@param[in] type IO context
-@return DB_SUCCESS or error code */
-static
-dberr_t
-os_aio_simulated_handler(
- ulint global_segment,
- fil_node_t** m1,
- void** m2,
- IORequest* type);
#ifdef _WIN32
-static HANDLE win_get_syncio_event();
/**
Wrapper around Windows DeviceIoControl() function.
@@ -803,7 +262,7 @@ os_win32_device_io_control(
)
{
OVERLAPPED overlapped = { 0 };
- overlapped.hEvent = win_get_syncio_event();
+ overlapped.hEvent = tpool::win_get_syncio_event();
BOOL result = DeviceIoControl(handle, code, inbuf, inbuf_size, outbuf,
outbuf_size, NULL, &overlapped);
@@ -817,47 +276,7 @@ os_win32_device_io_control(
#endif
-#ifdef WIN_ASYNC_IO
-/** This function is only used in Windows asynchronous i/o.
-Waits for an aio operation to complete. This function is used to wait the
-for completed requests. The aio array of pending requests is divided
-into segments. The thread specifies which segment or slot it wants to wait
-for. NOTE: this function will also take care of freeing the aio slot,
-therefore no other thread is allowed to do the freeing!
-@param[in] segment The number of the segment in the aio arrays to
-wait for; segment 0 is the ibuf I/O thread,
-segment 1 the log I/O thread, then follow the
-non-ibuf read threads, and as the last are the
-non-ibuf write threads; if this is
-ULINT_UNDEFINED, then it means that sync AIO
-is used, and this parameter is ignored
-@param[in] pos this parameter is used only in sync AIO:
-wait for the aio slot at this position
-@param[out] m1 the messages passed with the AIO request; note
-that also in the case where the AIO operation
-failed, these output parameters are valid and
-can be used to restart the operation,
-for example
-@param[out] m2 callback message
-@param[out] type OS_FILE_WRITE or ..._READ
-@return DB_SUCCESS or error code */
-static
-dberr_t
-os_aio_windows_handler(
- ulint segment,
- ulint pos,
- fil_node_t** m1,
- void** m2,
- IORequest* type);
-#endif /* WIN_ASYNC_IO */
-/** Generic AIO Handler methods. Currently handles IO post processing. */
-class AIOHandler {
-public:
- /** Do any post processing after a read/write
- @return DB_SUCCESS or error code. */
- static dberr_t post_io_processing(Slot* slot);
-};
/** Helper class for doing synchronous file IO. Currently, the objective
is to hide the OS specific code, so that the higher level functions aren't
@@ -890,10 +309,7 @@ public:
@return the number of bytes read/written or negative value on error */
ssize_t execute(const IORequest& request);
- /** Do the read/write
- @param[in,out] slot The IO slot, it has the IO context
- @return the number of bytes read/written or negative value on error */
- static ssize_t execute(Slot* slot);
+
/** Move the read/write offset up to where the partial IO succeeded.
@param[in] n_bytes The number of bytes to advance */
@@ -922,67 +338,6 @@ private:
os_offset_t m_offset;
};
-/** Do any post processing after a read/write
-@return DB_SUCCESS or error code. */
-dberr_t
-AIOHandler::post_io_processing(Slot* slot)
-{
- ut_ad(slot->is_reserved);
-
- /* Total bytes read so far */
- ulint n_bytes = ulint(slot->ptr - slot->buf) + slot->n_bytes;
-
- return(n_bytes == slot->original_len ? DB_SUCCESS : DB_FAIL);
-}
-
-/** Count the number of free slots
-@return number of reserved slots */
-ulint
-AIO::pending_io_count() const
-{
- acquire();
-
-#ifdef UNIV_DEBUG
- ut_a(m_n_segments > 0);
- ut_a(!m_slots.empty());
-
- ulint count = 0;
-
- for (ulint i = 0; i < m_slots.size(); ++i) {
-
- const Slot& slot = m_slots[i];
-
- if (slot.is_reserved) {
- ++count;
- ut_a(slot.len > 0);
- }
- }
-
- ut_a(m_n_reserved == count);
-#endif /* UNIV_DEBUG */
-
- ulint reserved = m_n_reserved;
-
- release();
-
- return(reserved);
-}
-
-#ifdef UNIV_DEBUG
-/** Validates the consistency the aio system some of the time.
-@return true if ok or the check was skipped */
-static
-bool
-os_aio_validate_skip()
-{
-/** Try os_aio_validate() every this many times */
-# define OS_AIO_VALIDATE_SKIP 13
-
- static Atomic_counter<uint32_t> os_aio_validate_count;
- return (os_aio_validate_count++ % OS_AIO_VALIDATE_SKIP) || os_aio_validate();
-}
-#endif /* UNIV_DEBUG */
-
#undef USE_FILE_LOCK
#ifndef _WIN32
/* On Windows, mandatory locking is used */
@@ -1026,101 +381,6 @@ os_file_lock(
}
#endif /* USE_FILE_LOCK */
-/** Calculates local segment number and aio array from global segment number.
-@param[out] array aio wait array
-@param[in] segment global segment number
-@return local segment number within the aio array */
-ulint
-AIO::get_array_and_local_segment(
- AIO** array,
- ulint segment)
-{
- ulint local_segment;
- ulint n_extra_segs = (srv_read_only_mode) ? 0 : 2;
-
- ut_a(segment < os_aio_n_segments);
-
- if (!srv_read_only_mode && segment < n_extra_segs) {
-
- /* We don't support ibuf/log IO during read only mode. */
-
- if (segment == IO_IBUF_SEGMENT) {
-
- *array = s_ibuf;
-
- } else if (segment == IO_LOG_SEGMENT) {
-
- *array = s_log;
-
- } else {
- *array = NULL;
- }
-
- local_segment = 0;
-
- } else if (segment < s_reads->m_n_segments + n_extra_segs) {
-
- *array = s_reads;
- local_segment = segment - n_extra_segs;
-
- } else {
- *array = s_writes;
-
- local_segment = segment
- - (s_reads->m_n_segments + n_extra_segs);
- }
-
- return(local_segment);
-}
-
-/** Frees a slot in the aio array. Assumes caller owns the mutex.
-@param[in,out] slot Slot to release */
-void
-AIO::release(Slot* slot)
-{
- ut_ad(is_mutex_owned());
-
- ut_ad(slot->is_reserved);
-
- slot->is_reserved = false;
-
- --m_n_reserved;
-
- if (m_n_reserved == m_slots.size() - 1) {
- os_event_set(m_not_full);
- }
-
- if (m_n_reserved == 0) {
- os_event_set(m_is_empty);
- }
-
-#if defined(LINUX_NATIVE_AIO)
-
- if (srv_use_native_aio) {
- memset(&slot->control, 0x0, sizeof(slot->control));
- slot->ret = 0;
- slot->n_bytes = 0;
- } else {
- /* These fields should not be used if we are not
- using native AIO. */
- ut_ad(slot->n_bytes == 0);
- ut_ad(slot->ret == 0);
- }
-
-#endif /* WIN_ASYNC_IO */
-}
-
-/** Frees a slot in the AIO array. Assumes caller doesn't own the mutex.
-@param[in,out] slot Slot to release */
-void
-AIO::release_with_mutex(Slot* slot)
-{
- acquire();
-
- release(slot);
-
- release();
-}
/** Create a temporary file. This function is like tmpfile(3), but
the temporary file is created in the in the mysql server configuration
@@ -1464,7 +724,7 @@ os_file_create_subdirs_if_needed(
return(success ? DB_SUCCESS : DB_ERROR);
}
-#ifndef _WIN32
+
/** Do the read/write
@param[in] request The IO context and type
@@ -1475,14 +735,24 @@ SyncFileIO::execute(const IORequest& request)
ssize_t n_bytes;
if (request.is_read()) {
+#ifdef _WIN32
+ n_bytes = tpool::pread(m_fh, m_buf, m_n, m_offset);
+#else
n_bytes = pread(m_fh, m_buf, m_n, m_offset);
+#endif
} else {
ut_ad(request.is_write());
+#ifdef _WIN32
+ n_bytes = tpool::pwrite(m_fh, m_buf, m_n, m_offset);
+#else
n_bytes = pwrite(m_fh, m_buf, m_n, m_offset);
+#endif
}
return(n_bytes);
}
+
+#ifndef _WIN32
/** Free storage space associated with a section of the file.
@param[in] fh Open file handle
@param[in] off Starting offset (SEEK_SET)
@@ -1526,732 +796,7 @@ os_file_punch_hole_posix(
return(DB_IO_NO_PUNCH_HOLE);
}
-#if defined(LINUX_NATIVE_AIO)
-
-/** Linux native AIO handler */
-class LinuxAIOHandler {
-public:
- /**
- @param[in] global_segment The global segment*/
- LinuxAIOHandler(ulint global_segment)
- :
- m_global_segment(global_segment)
- {
- /* Should never be doing Sync IO here. */
- ut_a(m_global_segment != ULINT_UNDEFINED);
-
- /* Find the array and the local segment. */
-
- m_segment = AIO::get_array_and_local_segment(
- &m_array, m_global_segment);
- m_n_slots = m_array->slots_per_segment();
- }
-
- /** Destructor */
- ~LinuxAIOHandler()
- {
- // No op
- }
-
- /**
- Process a Linux AIO request
- @param[out] m1 the messages passed with the
- @param[out] m2 AIO request; note that in case the
- AIO operation failed, these output
- parameters are valid and can be used to
- restart the operation.
- @param[out] request IO context
- @return DB_SUCCESS or error code */
- dberr_t poll(fil_node_t** m1, void** m2, IORequest* request);
-
-private:
- /** Resubmit an IO request that was only partially successful
- @param[in,out] slot Request to resubmit
- @return DB_SUCCESS or DB_FAIL if the IO resubmit request failed */
- dberr_t resubmit(Slot* slot);
-
- /** Check if the AIO succeeded
- @param[in,out] slot The slot to check
- @return DB_SUCCESS, DB_FAIL if the operation should be retried or
- DB_IO_ERROR on all other errors */
- dberr_t check_state(Slot* slot);
-
- /** @return true if a shutdown was detected */
- bool is_shutdown() const
- {
- return(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS
- && !buf_page_cleaner_is_active);
- }
-
- /** If no slot was found then the m_array->m_mutex will be released.
- @param[out] n_pending The number of pending IOs
- @return NULL or a slot that has completed IO */
- Slot* find_completed_slot(ulint* n_pending);
-
- /** This is called from within the IO-thread. If there are no completed
- IO requests in the slot array, the thread calls this function to
- collect more requests from the Linux kernel.
- The IO-thread waits on io_getevents(), which is a blocking call, with
- a timeout value. Unless the system is very heavy loaded, keeping the
- IO-thread very busy, the io-thread will spend most of its time waiting
- in this function.
- The IO-thread also exits in this function. It checks server status at
- each wakeup and that is why we use timed wait in io_getevents(). */
- void collect();
-
-private:
- /** Slot array */
- AIO* m_array;
-
- /** Number of slots inthe local segment */
- ulint m_n_slots;
-
- /** The local segment to check */
- ulint m_segment;
-
- /** The global segment */
- ulint m_global_segment;
-};
-
-/** Resubmit an IO request that was only partially successful
-@param[in,out] slot Request to resubmit
-@return DB_SUCCESS or DB_FAIL if the IO resubmit request failed */
-dberr_t
-LinuxAIOHandler::resubmit(Slot* slot)
-{
-#ifdef UNIV_DEBUG
- /* Bytes already read/written out */
- ulint n_bytes = slot->ptr - slot->buf;
-
- ut_ad(m_array->is_mutex_owned());
-
- ut_ad(n_bytes < slot->original_len);
- ut_ad(static_cast<ulint>(slot->n_bytes) < slot->original_len - n_bytes);
- /* Partial read or write scenario */
- ut_ad(slot->len >= static_cast<ulint>(slot->n_bytes));
-#endif /* UNIV_DEBUG */
-
- slot->len -= slot->n_bytes;
- slot->ptr += slot->n_bytes;
- slot->offset += slot->n_bytes;
-
- /* Resetting the bytes read/written */
- slot->n_bytes = 0;
- slot->io_already_done = false;
-
- compile_time_assert(sizeof(off_t) >= sizeof(os_offset_t));
-
- struct iocb* iocb = &slot->control;
-
- if (slot->type.is_read()) {
-
- io_prep_pread(
- iocb,
- slot->file,
- slot->ptr,
- slot->len,
- slot->offset);
- } else {
-
- ut_a(slot->type.is_write());
-
- io_prep_pwrite(
- iocb,
- slot->file,
- slot->ptr,
- slot->len,
- slot->offset);
- }
-
- iocb->data = slot;
-
- /* Resubmit an I/O request */
- int ret = io_submit(m_array->io_ctx(m_segment), 1, &iocb);
- srv_stats.buffered_aio_submitted.inc();
-
- if (ret < -1) {
- errno = -ret;
- }
-
- return(ret < 0 ? DB_IO_PARTIAL_FAILED : DB_SUCCESS);
-}
-
-/** Check if the AIO succeeded
-@param[in,out] slot The slot to check
-@return DB_SUCCESS, DB_FAIL if the operation should be retried or
- DB_IO_ERROR on all other errors */
-dberr_t
-LinuxAIOHandler::check_state(Slot* slot)
-{
- ut_ad(m_array->is_mutex_owned());
-
- /* Note that it may be that there is more then one completed
- IO requests. We process them one at a time. We may have a case
- here to improve the performance slightly by dealing with all
- requests in one sweep. */
-
- srv_set_io_thread_op_info(
- m_global_segment, "processing completed aio requests");
-
- ut_ad(slot->io_already_done);
-
- dberr_t err = DB_SUCCESS;
-
- if (slot->ret == 0) {
-
- err = AIOHandler::post_io_processing(slot);
-
- } else {
- errno = -slot->ret;
-
- /* os_file_handle_error does tell us if we should retry
- this IO. As it stands now, we don't do this retry when
- reaping requests from a different context than
- the dispatcher. This non-retry logic is the same for
- Windows and Linux native AIO.
- We should probably look into this to transparently
- re-submit the IO. */
- os_file_handle_error(slot->name, "Linux aio");
-
- err = DB_IO_ERROR;
- }
-
- return(err);
-}
-
-/** If no slot was found then the m_array->m_mutex will be released.
-@param[out] n_pending The number of pending IOs
-@return NULL or a slot that has completed IO */
-Slot*
-LinuxAIOHandler::find_completed_slot(ulint* n_pending)
-{
- ulint offset = m_n_slots * m_segment;
-
- *n_pending = 0;
-
- m_array->acquire();
-
- Slot* slot = m_array->at(offset);
-
- for (ulint i = 0; i < m_n_slots; ++i, ++slot) {
-
- if (slot->is_reserved) {
-
- ++*n_pending;
-
- if (slot->io_already_done) {
-
- /* Something for us to work on.
- Note: We don't release the mutex. */
- return(slot);
- }
- }
- }
-
- m_array->release();
-
- return(NULL);
-}
-
-/** This function is only used in Linux native asynchronous i/o. This is
-called from within the io-thread. If there are no completed IO requests
-in the slot array, the thread calls this function to collect more
-requests from the kernel.
-The io-thread waits on io_getevents(), which is a blocking call, with
-a timeout value. Unless the system is very heavy loaded, keeping the
-io-thread very busy, the io-thread will spend most of its time waiting
-in this function.
-The io-thread also exits in this function. It checks server status at
-each wakeup and that is why we use timed wait in io_getevents(). */
-void
-LinuxAIOHandler::collect()
-{
- ut_ad(m_n_slots > 0);
- ut_ad(m_array != NULL);
- ut_ad(m_segment < m_array->get_n_segments());
-
- /* Which io_context we are going to use. */
- io_context* io_ctx = m_array->io_ctx(m_segment);
-
- /* Starting point of the m_segment we will be working on. */
- ulint start_pos = m_segment * m_n_slots;
-
- /* End point. */
- ulint end_pos = start_pos + m_n_slots;
-
- for (;;) {
- struct io_event* events;
-
- /* Which part of event array we are going to work on. */
- events = m_array->io_events(m_segment * m_n_slots);
-
- /* Initialize the events. */
- memset(events, 0, sizeof(*events) * m_n_slots);
-
- /* The timeout value is arbitrary. We probably need
- to experiment with it a little. */
- struct timespec timeout;
-
- timeout.tv_sec = 0;
- timeout.tv_nsec = OS_AIO_REAP_TIMEOUT;
-
- int ret;
-
- ret = io_getevents(io_ctx, 1, m_n_slots, events, &timeout);
-
- for (int i = 0; i < ret; ++i) {
-
- struct iocb* iocb;
-
- iocb = reinterpret_cast<struct iocb*>(events[i].obj);
- ut_a(iocb != NULL);
-
- Slot* slot = reinterpret_cast<Slot*>(iocb->data);
-
- /* Some sanity checks. */
- ut_a(slot != NULL);
- ut_a(slot->is_reserved);
-
- /* We are not scribbling previous segment. */
- ut_a(slot->pos >= start_pos);
-
- /* We have not overstepped to next segment. */
- ut_a(slot->pos < end_pos);
-
- /* Deallocate unused blocks from file system.
- This is newer done to page 0 or to log files.*/
- if (slot->offset > 0
- && !slot->type.is_log()
- && slot->type.is_write()
- && slot->type.punch_hole()) {
-
- slot->err = slot->type.punch_hole(
- slot->file,
- slot->offset, slot->len);
- } else {
- slot->err = DB_SUCCESS;
- }
-
- /* Mark this request as completed. The error handling
- will be done in the calling function. */
- m_array->acquire();
-
- /* events[i].res2 should always be ZERO */
- ut_ad(events[i].res2 == 0);
- slot->io_already_done = true;
-
- /*Even though events[i].res is an unsigned number
- in libaio, it is used to return a negative value
- (negated errno value) to indicate error and a positive
- value to indicate number of bytes read or written. */
-
- if (events[i].res > slot->len) {
- /* failure */
- slot->n_bytes = 0;
- slot->ret = events[i].res;
- } else {
- /* success */
- slot->n_bytes = events[i].res;
- slot->ret = 0;
- }
- m_array->release();
- }
-
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS
- || !buf_page_cleaner_is_active
- || ret > 0) {
-
- break;
- }
-
- /* This error handling is for any error in collecting the
- IO requests. The errors, if any, for any particular IO
- request are simply passed on to the calling routine. */
-
- switch (ret) {
- case -EAGAIN:
- /* Not enough resources! Try again. */
-
- case -EINTR:
- /* Interrupted! The behaviour in case of an interrupt.
- If we have some completed IOs available then the
- return code will be the number of IOs. We get EINTR
- only if there are no completed IOs and we have been
- interrupted. */
-
- case 0:
- /* No pending request! Go back and check again. */
-
- continue;
- }
-
- /* All other errors should cause a trap for now. */
- ib::fatal()
- << "Unexpected ret_code[" << ret
- << "] from io_getevents()!";
-
- break;
- }
-}
-
-/** Process a Linux AIO request
-@param[out] m1 the messages passed with the
-@param[out] m2 AIO request; note that in case the
- AIO operation failed, these output
- parameters are valid and can be used to
- restart the operation.
-@param[out] request IO context
-@return DB_SUCCESS or error code */
-dberr_t
-LinuxAIOHandler::poll(fil_node_t** m1, void** m2, IORequest* request)
-{
- dberr_t err = DB_SUCCESS;
- Slot* slot;
-
- /* Loop until we have found a completed request. */
- for (;;) {
-
- ulint n_pending;
-
- slot = find_completed_slot(&n_pending);
-
- if (slot != NULL) {
-
- ut_ad(m_array->is_mutex_owned());
-
- err = check_state(slot);
-
- /* DB_FAIL is not a hard error, we should retry */
- if (err != DB_FAIL) {
- break;
- }
-
- /* Partial IO, resubmit request for
- remaining bytes to read/write */
- err = resubmit(slot);
-
- if (err != DB_SUCCESS) {
- break;
- }
-
- m_array->release();
-
- } else if (is_shutdown() && n_pending == 0) {
-
- /* There is no completed request. If there is
- no pending request at all, and the system is
- being shut down, exit. */
-
- *m1 = NULL;
- *m2 = NULL;
-
- return(DB_SUCCESS);
-
- } else {
-
- /* Wait for some request. Note that we return
- from wait if we have found a request. */
-
- srv_set_io_thread_op_info(
- m_global_segment,
- "waiting for completed aio requests");
-
- collect();
- }
- }
-
- if (err == DB_IO_PARTIAL_FAILED) {
- /* Aborting in case of submit failure */
- ib::fatal()
- << "Native Linux AIO interface. "
- "io_submit() call failed when "
- "resubmitting a partial I/O "
- "request on the file " << slot->name
- << ".";
- }
-
- *m1 = slot->m1;
- *m2 = slot->m2;
-
- *request = slot->type;
-
- m_array->release(slot);
-
- m_array->release();
-
- return(err);
-}
-
-/** This function is only used in Linux native asynchronous i/o.
-Waits for an aio operation to complete. This function is used to wait for
-the completed requests. The aio array of pending requests is divided
-into segments. The thread specifies which segment or slot it wants to wait
-for. NOTE: this function will also take care of freeing the aio slot,
-therefore no other thread is allowed to do the freeing!
-
-@param[in] global_seg segment number in the aio array
- to wait for; segment 0 is the ibuf
- i/o thread, segment 1 is log i/o thread,
- then follow the non-ibuf read threads,
- and the last are the non-ibuf write
- threads.
-@param[out] m1 the messages passed with the
-@param[out] m2 AIO request; note that in case the
- AIO operation failed, these output
- parameters are valid and can be used to
- restart the operation.
-@param[out]xi request IO context
-@return DB_SUCCESS if the IO was successful */
-static
-dberr_t
-os_aio_linux_handler(
- ulint global_segment,
- fil_node_t** m1,
- void** m2,
- IORequest* request)
-{
- return LinuxAIOHandler(global_segment).poll(m1, m2, request);
-}
-
-/** Dispatch an AIO request to the kernel.
-@param[in,out] slot an already reserved slot
-@return true on success. */
-bool
-AIO::linux_dispatch(Slot* slot)
-{
- ut_a(slot->is_reserved);
- ut_ad(slot->type.validate());
-
- /* Find out what we are going to work with.
- The iocb struct is directly in the slot.
- The io_context is one per segment. */
-
- ulint io_ctx_index;
- struct iocb* iocb = &slot->control;
-
- io_ctx_index = (slot->pos * m_n_segments) / m_slots.size();
-
- int ret = io_submit(m_aio_ctx[io_ctx_index], 1, &iocb);
- srv_stats.buffered_aio_submitted.inc();
-
- /* io_submit() returns number of successfully queued requests
- or -errno. */
-
- if (ret != 1) {
- errno = -ret;
- }
-
- return(ret == 1);
-}
-
-/** Creates an io_context for native linux AIO.
-@param[in] max_events number of events
-@param[out] io_ctx io_ctx to initialize.
-@return true on success. */
-bool
-AIO::linux_create_io_ctx(
- unsigned max_events,
- io_context_t* io_ctx)
-{
- ssize_t n_retries = 0;
-
- for (;;) {
-
- memset(io_ctx, 0x0, sizeof(*io_ctx));
-
- /* Initialize the io_ctx. Tell it how many pending
- IO requests this context will handle. */
-
- int ret = io_setup(max_events, io_ctx);
-
- if (ret == 0) {
- /* Success. Return now. */
- return(true);
- }
-
- /* If we hit EAGAIN we'll make a few attempts before failing. */
-
- switch (ret) {
- case -EAGAIN:
- if (n_retries == 0) {
- /* First time around. */
- ib::warn()
- << "io_setup() failed with EAGAIN."
- " Will make "
- << OS_AIO_IO_SETUP_RETRY_ATTEMPTS
- << " attempts before giving up.";
- }
-
- if (n_retries < OS_AIO_IO_SETUP_RETRY_ATTEMPTS) {
-
- ++n_retries;
-
- ib::warn()
- << "io_setup() attempt "
- << n_retries << ".";
-
- os_thread_sleep(OS_AIO_IO_SETUP_RETRY_SLEEP);
-
- continue;
- }
-
- /* Have tried enough. Better call it a day. */
- ib::error()
- << "io_setup() failed with EAGAIN after "
- << OS_AIO_IO_SETUP_RETRY_ATTEMPTS
- << " attempts.";
- break;
-
- case -ENOSYS:
- ib::error()
- << "Linux Native AIO interface"
- " is not supported on this platform. Please"
- " check your OS documentation and install"
- " appropriate binary of InnoDB.";
-
- break;
-
- default:
- ib::error()
- << "Linux Native AIO setup"
- << " returned following error["
- << ret << "]";
- break;
- }
-
- ib::info()
- << "You can disable Linux Native AIO by"
- " setting innodb_use_native_aio = 0 in my.cnf";
-
- break;
- }
-
- return(false);
-}
-
-/** Checks if the system supports native linux aio. On some kernel
-versions where native aio is supported it won't work on tmpfs. In such
-cases we can't use native aio as it is not possible to mix simulated
-and native aio.
-@return: true if supported, false otherwise. */
-bool
-AIO::is_linux_native_aio_supported()
-{
- File fd;
- io_context_t io_ctx;
- char name[1000];
-
- if (!linux_create_io_ctx(1, &io_ctx)) {
-
- /* The platform does not support native aio. */
-
- return(false);
-
- } else if (!srv_read_only_mode) {
-
- /* Now check if tmpdir supports native aio ops. */
- fd = mysql_tmpfile("ib");
-
- if (fd < 0) {
- ib::warn()
- << "Unable to create temp file to check"
- " native AIO support.";
-
- return(false);
- }
- } else {
-
- os_normalize_path(srv_log_group_home_dir);
-
- ulint dirnamelen = strlen(srv_log_group_home_dir);
-
- ut_a(dirnamelen < (sizeof name) - 10 - sizeof "ib_logfile");
-
- memcpy(name, srv_log_group_home_dir, dirnamelen);
-
- /* Add a path separator if needed. */
- if (dirnamelen && name[dirnamelen - 1] != OS_PATH_SEPARATOR) {
-
- name[dirnamelen++] = OS_PATH_SEPARATOR;
- }
-
- strcpy(name + dirnamelen, "ib_logfile0");
-
- fd = my_open(name, O_RDONLY | O_CLOEXEC, MYF(0));
-
- if (fd == -1) {
-
- ib::warn()
- << "Unable to open"
- << " \"" << name << "\" to check native"
- << " AIO read support.";
-
- return(false);
- }
- }
-
- struct io_event io_event;
-
- memset(&io_event, 0x0, sizeof(io_event));
-
- byte* buf = static_cast<byte*>(ut_malloc_nokey(srv_page_size * 2));
- byte* ptr = static_cast<byte*>(ut_align(buf, srv_page_size));
-
- struct iocb iocb;
-
- /* Suppress valgrind warning. */
- memset(buf, 0x00, srv_page_size * 2);
- memset(&iocb, 0x0, sizeof(iocb));
-
- struct iocb* p_iocb = &iocb;
-
- if (!srv_read_only_mode) {
-
- io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0);
-
- } else {
- ut_a(srv_page_size >= 512);
- io_prep_pread(p_iocb, fd, ptr, 512, 0);
- }
-
- int err = io_submit(io_ctx, 1, &p_iocb);
- srv_stats.buffered_aio_submitted.inc();
-
- if (err >= 1) {
- /* Now collect the submitted IO request. */
- err = io_getevents(io_ctx, 1, 1, &io_event, NULL);
- }
-
- ut_free(buf);
- my_close(fd, MYF(MY_WME));
-
- switch (err) {
- case 1:
- return(true);
-
- case -EINVAL:
- case -ENOSYS:
- ib::error()
- << "Linux Native AIO not supported. You can either"
- " move "
- << (srv_read_only_mode ? name : "tmpdir")
- << " to a file system that supports native"
- " AIO or you can set innodb_use_native_aio to"
- " FALSE to avoid this message.";
-
- /* fall through. */
- default:
- ib::error()
- << "Linux Native AIO check on "
- << (srv_read_only_mode ? name : "tmpdir")
- << "returned error[" << -err << "]";
- }
-
- return(false);
-}
-
-#endif /* LINUX_NATIVE_AIO */
/** Retrieves the last error number if an error occurs in a file io function.
The number should be retrieved before any other OS calls (because they may
@@ -3283,127 +1828,6 @@ os_file_set_eof(
#include <WinIoCtl.h>
-/*
-Windows : Handling synchronous IO on files opened asynchronously.
-
-If file is opened for asynchronous IO (FILE_FLAG_OVERLAPPED) and also bound to
-a completion port, then every IO on this file would normally be enqueued to the
-completion port. Sometimes however we would like to do a synchronous IO. This is
-possible if we initialitze have overlapped.hEvent with a valid event and set its
-lowest order bit to 1 (see MSDN ReadFile and WriteFile description for more info)
-
-We'll create this special event once for each thread and store in thread local
-storage.
-*/
-
-
-static void __stdcall win_free_syncio_event(void *data) {
- if (data) {
- CloseHandle((HANDLE)data);
- }
-}
-
-
-/*
-Retrieve per-thread event for doing synchronous io on asyncronously opened files
-*/
-static HANDLE win_get_syncio_event()
-{
- HANDLE h;
-
- h = (HANDLE)FlsGetValue(fls_sync_io);
- if (h) {
- return h;
- }
- h = CreateEventA(NULL, FALSE, FALSE, NULL);
- ut_a(h);
- /* Set low-order bit to keeps I/O completion from being queued */
- h = (HANDLE)((uintptr_t)h | 1);
- FlsSetValue(fls_sync_io, h);
- return h;
-}
-
-
-/** Do the read/write
-@param[in] request The IO context and type
-@return the number of bytes read/written or negative value on error */
-ssize_t
-SyncFileIO::execute(const IORequest& request)
-{
- OVERLAPPED seek;
-
- memset(&seek, 0x0, sizeof(seek));
-
- seek.hEvent = win_get_syncio_event();
- seek.Offset = (DWORD) m_offset & 0xFFFFFFFF;
- seek.OffsetHigh = (DWORD) (m_offset >> 32);
-
- BOOL ret;
- DWORD n_bytes;
-
- if (request.is_read()) {
- ret = ReadFile(m_fh, m_buf,
- static_cast<DWORD>(m_n), NULL, &seek);
-
- } else {
- ut_ad(request.is_write());
- ret = WriteFile(m_fh, m_buf,
- static_cast<DWORD>(m_n), NULL, &seek);
- }
- if (ret || (GetLastError() == ERROR_IO_PENDING)) {
- /* Wait for async io to complete */
- ret = GetOverlappedResult(m_fh, &seek, &n_bytes, TRUE);
- }
-
- return(ret ? static_cast<ssize_t>(n_bytes) : -1);
-}
-
-/** Do the read/write
-@param[in,out] slot The IO slot, it has the IO context
-@return the number of bytes read/written or negative value on error */
-ssize_t
-SyncFileIO::execute(Slot* slot)
-{
- BOOL ret;
- slot->control.hEvent = win_get_syncio_event();
- if (slot->type.is_read()) {
-
- ret = ReadFile(
- slot->file, slot->ptr, slot->len,
- NULL, &slot->control);
-
- } else {
- ut_ad(slot->type.is_write());
-
- ret = WriteFile(
- slot->file, slot->ptr, slot->len,
- NULL, &slot->control);
-
- }
- if (ret || (GetLastError() == ERROR_IO_PENDING)) {
- /* Wait for async io to complete */
- ret = GetOverlappedResult(slot->file, &slot->control, &slot->n_bytes, TRUE);
- }
-
- return(ret ? static_cast<ssize_t>(slot->n_bytes) : -1);
-}
-
-/* Startup/shutdown */
-
-struct WinIoInit
-{
- WinIoInit() {
- fls_sync_io = FlsAlloc(win_free_syncio_event);
- ut_a(fls_sync_io != FLS_OUT_OF_INDEXES);
- }
-
- ~WinIoInit() {
- FlsFree(fls_sync_io);
- }
-};
-
-/* Ensures proper initialization and shutdown */
-static WinIoInit win_io_init;
/** Free storage space associated with a section of the file.
@@ -4167,18 +2591,9 @@ os_file_create_func(
}
}
- if (*success && srv_use_native_aio && (attributes & FILE_FLAG_OVERLAPPED)) {
- /* Bind the file handle to completion port. Completion port
- might not be created yet, in some stages of backup, but
- must always be there for the server.*/
- HANDLE port = (type == OS_LOG_FILE) ?
- log_completion_port : data_completion_port;
- ut_a(port || srv_operation != SRV_OPERATION_NORMAL);
- if (port) {
- ut_a(CreateIoCompletionPort(file, port, 0, 0));
- }
+ if (*success && (attributes & FILE_FLAG_OVERLAPPED) && srv_thread_pool) {
+ srv_thread_pool->bind(file);
}
-
return(file);
}
@@ -4438,15 +2853,18 @@ bool
os_file_close_func(
os_file_t file)
{
- ut_a(file);
+ ut_a(file != 0);
- if (CloseHandle(file)) {
- return(true);
- }
- os_file_handle_error(NULL, "close");
+ if (!CloseHandle(file)) {
+ os_file_handle_error(NULL, "close");
+ return false;
+ }
- return(false);
+ if(srv_thread_pool)
+ srv_thread_pool->unbind(file);
+
+ return(true);
}
/** Gets a file size.
@@ -4943,17 +3361,19 @@ os_file_read_page(
if (ulint(n_bytes) == n || (err != DB_SUCCESS && !exit_on_err)) {
return err;
}
-
- ib::error() << "Tried to read " << n << " bytes at offset "
- << offset << ", but was only able to read " << n_bytes;
+ int os_err = IF_WIN((int)GetLastError(), errno);
if (!os_file_handle_error_cond_exit(
NULL, "read", exit_on_err, false)) {
ib::fatal()
- << "Cannot read from file. OS error number "
- << errno << ".";
+ << "Tried to read " << n << " bytes at offset "
+ << offset << ", but was only able to read " << n_bytes
+ << ".Cannot read from file. OS error number "
+ << os_err << ".";
+ } else {
+ ib::error() << "Tried to read " << n << " bytes at offset "
+ << offset << ", but was only able to read " << n_bytes;
}
-
if (err == DB_SUCCESS) {
err = DB_IO_ERROR;
}
@@ -5478,979 +3898,201 @@ os_file_get_status(
return(ret);
}
-/**
-Waits for an AIO operation to complete. This function is used to wait the
-for completed requests. The aio array of pending requests is divided
-into segments. The thread specifies which segment or slot it wants to wait
-for. NOTE: this function will also take care of freeing the aio slot,
-therefore no other thread is allowed to do the freeing!
-@param[in] segment The number of the segment in the aio arrays to
- wait for; segment 0 is the ibuf I/O thread,
- segment 1 the log I/O thread, then follow the
- non-ibuf read threads, and as the last are the
- non-ibuf write threads; if this is
- ULINT_UNDEFINED, then it means that sync AIO
- is used, and this parameter is ignored
-@param[out] m1 the messages passed with the AIO request; note
- that also in the case where the AIO operation
- failed, these output parameters are valid and
- can be used to restart the operation,
- for example
-@param[out] m2 callback message
-@param[out] type OS_FILE_WRITE or ..._READ
-@return DB_SUCCESS or error code */
-dberr_t
-os_aio_handler(
- ulint segment,
- fil_node_t** m1,
- void** m2,
- IORequest* request)
-{
- dberr_t err;
-
- if (srv_use_native_aio) {
- srv_set_io_thread_op_info(segment, "native aio handle");
-
-#ifdef WIN_ASYNC_IO
-
- err = os_aio_windows_handler(segment, 0, m1, m2, request);
-
-#elif defined(LINUX_NATIVE_AIO)
-
- err = os_aio_linux_handler(segment, m1, m2, request);
-
-#else
- ut_error;
-
- err = DB_ERROR; /* Eliminate compiler warning */
-
-#endif /* WIN_ASYNC_IO */
-
- } else {
- srv_set_io_thread_op_info(segment, "simulated aio handle");
-
- err = os_aio_simulated_handler(segment, m1, m2, request);
- }
-
- return(err);
-}
-
-#ifdef WIN_ASYNC_IO
-static HANDLE new_completion_port()
-{
- HANDLE h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
- ut_a(h);
- return h;
-}
-#endif
-
-/** Constructor
-@param[in] id The latch ID
-@param[in] n Number of AIO slots
-@param[in] segments Number of segments */
-AIO::AIO(
- latch_id_t id,
- ulint n,
- ulint segments)
- :
-#ifdef WIN_ASYNC_IO
- m_completion_port(new_completion_port()),
-#endif
- m_slots(n),
- m_n_segments(segments),
- m_n_reserved()
-# ifdef LINUX_NATIVE_AIO
- ,m_aio_ctx(),
- m_events(m_slots.size())
-# endif /* LINUX_NATIVE_AIO */
-{
- ut_a(n > 0);
- ut_a(m_n_segments > 0);
-
- mutex_create(id, &m_mutex);
-
- m_not_full = os_event_create("aio_not_full");
- m_is_empty = os_event_create("aio_is_empty");
-
- memset((void*)&m_slots[0], 0x0, sizeof(m_slots[0]) * m_slots.size());
-#ifdef LINUX_NATIVE_AIO
- memset(&m_events[0], 0x0, sizeof(m_events[0]) * m_events.size());
-#endif /* LINUX_NATIVE_AIO */
-
- os_event_set(m_is_empty);
-}
-
-/** Initialise the slots */
-dberr_t
-AIO::init_slots()
-{
- for (ulint i = 0; i < m_slots.size(); ++i) {
- Slot& slot = m_slots[i];
-
- slot.pos = static_cast<uint16_t>(i);
-
- slot.is_reserved = false;
-#ifdef WIN_ASYNC_IO
-
- slot.array = this;
-
-#elif defined(LINUX_NATIVE_AIO)
-
- slot.ret = 0;
-
- slot.n_bytes = 0;
-
- memset(&slot.control, 0x0, sizeof(slot.control));
-
-#endif /* WIN_ASYNC_IO */
- }
-
- return(DB_SUCCESS);
-}
+extern void fil_aio_callback(const tpool::aiocb *cb);
-#ifdef LINUX_NATIVE_AIO
-/** Initialise the Linux Native AIO interface */
-dberr_t
-AIO::init_linux_native_aio()
+static void io_callback(tpool::aiocb* cb)
{
- /* Initialize the io_context array. One io_context
- per segment in the array. */
-
- ut_a(m_aio_ctx == NULL);
-
- m_aio_ctx = static_cast<io_context**>(
- ut_zalloc_nokey(m_n_segments * sizeof(*m_aio_ctx)));
-
- if (m_aio_ctx == NULL) {
- return(DB_OUT_OF_MEMORY);
- }
+ fil_aio_callback(cb);
- io_context** ctx = m_aio_ctx;
- ulint max_events = slots_per_segment();
-
- for (ulint i = 0; i < m_n_segments; ++i, ++ctx) {
-
- if (!linux_create_io_ctx(max_events, ctx)) {
- /* If something bad happened during aio setup
- we disable linux native aio.
- The disadvantage will be a small memory leak
- at shutdown but that's ok compared to a crash
- or a not working server.
- This frequently happens when running the test suite
- with many threads on a system with low fs.aio-max-nr!
- */
-
- ib::warn()
- << "Warning: Linux Native AIO disabled "
- << "because _linux_create_io_ctx() "
- << "failed. To get rid of this warning you can "
- << "try increasing system "
- << "fs.aio-max-nr to 1048576 or larger or "
- << "setting innodb_use_native_aio = 0 in my.cnf";
- ut_free(m_aio_ctx);
- m_aio_ctx = 0;
- srv_use_native_aio = FALSE;
- return(DB_SUCCESS);
+ /* Return cb back to cache*/
+ if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) {
+ if (read_slots->contains(cb)) {
+ read_slots->release(cb);
+ } else {
+ ut_ad(ibuf_slots->contains(cb));
+ ibuf_slots->release(cb);
}
+ } else {
+ ut_ad(write_slots->contains(cb));
+ write_slots->release(cb);
}
-
- return(DB_SUCCESS);
}
-#endif /* LINUX_NATIVE_AIO */
-
-/** Initialise the array */
-dberr_t
-AIO::init()
-{
- ut_a(!m_slots.empty());
-
- if (srv_use_native_aio) {
#ifdef LINUX_NATIVE_AIO
- dberr_t err = init_linux_native_aio();
-
- if (err != DB_SUCCESS) {
- return(err);
- }
-
-#endif /* LINUX_NATIVE_AIO */
- }
-
- return(init_slots());
-}
-
-/** Creates an aio wait array. Note that we return NULL in case of failure.
-We don't care about freeing memory here because we assume that a
-failure will result in server refusing to start up.
-@param[in] id Latch ID
-@param[in] n maximum number of pending AIO operations
- allowed; n must be divisible by m_n_segments
-@param[in] n_segments number of segments in the AIO array
-@return own: AIO array, NULL on failure */
-AIO*
-AIO::create(
- latch_id_t id,
- ulint n,
- ulint n_segments)
-{
- if ((n % n_segments)) {
-
- ib::error()
- << "Maximum number of AIO operations must be "
- << "divisible by number of segments";
-
- return(NULL);
- }
-
- AIO* array = UT_NEW_NOKEY(AIO(id, n, n_segments));
-
- if (array != NULL && array->init() != DB_SUCCESS) {
-
- UT_DELETE(array);
-
- array = NULL;
- }
-
- return(array);
-}
-
-/** AIO destructor */
-AIO::~AIO()
-{
- mutex_destroy(&m_mutex);
-
- os_event_destroy(m_not_full);
- os_event_destroy(m_is_empty);
-
-#if defined(LINUX_NATIVE_AIO)
- if (srv_use_native_aio) {
- m_events.clear();
- ut_free(m_aio_ctx);
- }
-#endif /* LINUX_NATIVE_AIO */
-#if defined(WIN_ASYNC_IO)
- CloseHandle(m_completion_port);
-#endif
-
- m_slots.clear();
-}
+/** Checks if the system supports native linux aio. On some kernel
+versions where native aio is supported it won't work on tmpfs. In such
+cases we can't use native aio.
-/** Initializes the asynchronous io system. Creates one array each for ibuf
-and log i/o. Also creates one array each for read and write where each
-array is divided logically into n_readers and n_writers
-respectively. The caller must create an i/o handler thread for each
-segment in these arrays. This function also creates the sync array.
-No i/o handler thread needs to be created for that
-@param[in] n_per_seg maximum number of pending aio
- operations allowed per segment
-@param[in] n_readers number of reader threads
-@param[in] n_writers number of writer threads
-@param[in] n_slots_sync number of slots in the sync aio array
-@return true if the AIO sub-system was started successfully */
-bool
-AIO::start(
- ulint n_per_seg,
- ulint n_readers,
- ulint n_writers,
- ulint n_slots_sync)
+@return: true if supported, false otherwise. */
+#include <libaio.h>
+static bool is_linux_native_aio_supported()
{
-#if defined(LINUX_NATIVE_AIO)
- /* Check if native aio is supported on this system and tmpfs */
- if (srv_use_native_aio && !is_linux_native_aio_supported()) {
-
- ib::warn() << "Linux Native AIO disabled.";
-
- srv_use_native_aio = FALSE;
- }
-#endif /* LINUX_NATIVE_AIO */
+ File fd;
+ io_context_t io_ctx;
+ char name[1000];
- srv_reset_io_thread_op_info();
+ memset(&io_ctx, 0, sizeof(io_ctx));
+ if (io_setup(1, &io_ctx)) {
- s_reads = create(
- LATCH_ID_OS_AIO_READ_MUTEX, n_readers * n_per_seg, n_readers);
+ /* The platform does not support native aio. */
- if (s_reads == NULL) {
return(false);
- }
-
- ulint start = srv_read_only_mode ? 0 : 2;
- ulint n_segs = n_readers + start;
- /* 0 is the ibuf segment and 1 is the redo log segment. */
- for (ulint i = start; i < n_segs; ++i) {
- ut_a(i < SRV_MAX_N_IO_THREADS);
- srv_io_thread_function[i] = "read thread";
}
+ else if (!srv_read_only_mode) {
- ulint n_segments = n_readers;
-
- if (!srv_read_only_mode) {
-
- s_ibuf = create(LATCH_ID_OS_AIO_IBUF_MUTEX, n_per_seg, 1);
-
- if (s_ibuf == NULL) {
- return(false);
- }
-
- ++n_segments;
-
- srv_io_thread_function[0] = "insert buffer thread";
+ /* Now check if tmpdir supports native aio ops. */
+ fd = mysql_tmpfile("ib");
- s_log = create(LATCH_ID_OS_AIO_LOG_MUTEX, n_per_seg, 1);
+ if (fd < 0) {
+ ib::warn()
+ << "Unable to create temp file to check"
+ " native AIO support.";
- if (s_log == NULL) {
return(false);
}
-
- ++n_segments;
-
- srv_io_thread_function[1] = "log thread";
-
- } else {
- s_ibuf = s_log = NULL;
- }
-
- s_writes = create(
- LATCH_ID_OS_AIO_WRITE_MUTEX, n_writers * n_per_seg, n_writers);
-
- if (s_writes == NULL) {
- return(false);
}
+ else {
-#ifdef WIN_ASYNC_IO
- data_completion_port = s_writes->m_completion_port;
- log_completion_port =
- s_log ? s_log->m_completion_port : data_completion_port;
-#endif
-
- n_segments += n_writers;
-
- for (ulint i = start + n_readers; i < n_segments; ++i) {
- ut_a(i < SRV_MAX_N_IO_THREADS);
- srv_io_thread_function[i] = "write thread";
- }
-
- ut_ad(n_segments >= static_cast<ulint>(srv_read_only_mode ? 2 : 4));
-
- s_sync = create(LATCH_ID_OS_AIO_SYNC_MUTEX, n_slots_sync, 1);
-
- if (s_sync == NULL) {
-
- return(false);
- }
-
- os_aio_n_segments = n_segments;
-
- os_aio_validate();
-
- os_last_printout = time(NULL);
-
- if (srv_use_native_aio) {
- return(true);
- }
-
- os_aio_segment_wait_events = static_cast<os_event_t*>(
- ut_zalloc_nokey(
- n_segments * sizeof *os_aio_segment_wait_events));
-
- if (os_aio_segment_wait_events == NULL) {
-
- return(false);
- }
-
- for (ulint i = 0; i < n_segments; ++i) {
- os_aio_segment_wait_events[i] = os_event_create(0);
- }
-
- return(true);
-}
-
-/** Free the AIO arrays */
-void
-AIO::shutdown()
-{
- UT_DELETE(s_ibuf);
- s_ibuf = NULL;
-
- UT_DELETE(s_log);
- s_log = NULL;
-
- UT_DELETE(s_writes);
- s_writes = NULL;
-
- UT_DELETE(s_sync);
- s_sync = NULL;
-
- UT_DELETE(s_reads);
- s_reads = NULL;
-}
+ os_normalize_path(srv_log_group_home_dir);
-/** Initializes the asynchronous io system. Creates one array each for ibuf
-and log i/o. Also creates one array each for read and write where each
-array is divided logically into n_readers and n_writers
-respectively. The caller must create an i/o handler thread for each
-segment in these arrays. This function also creates the sync array.
-No i/o handler thread needs to be created for that
-@param[in] n_readers number of reader threads
-@param[in] n_writers number of writer threads
-@param[in] n_slots_sync number of slots in the sync aio array */
-bool
-os_aio_init(
- ulint n_readers,
- ulint n_writers,
- ulint n_slots_sync)
-{
- /* Maximum number of pending aio operations allowed per segment */
- ulint limit = 8 * OS_AIO_N_PENDING_IOS_PER_THREAD;
+ ulint dirnamelen = strlen(srv_log_group_home_dir);
- return(AIO::start(limit, n_readers, n_writers, n_slots_sync));
-}
+ ut_a(dirnamelen < (sizeof name) - 10 - sizeof "ib_logfile");
-/** Frees the asynchronous io system. */
-void
-os_aio_free()
-{
- AIO::shutdown();
+ memcpy(name, srv_log_group_home_dir, dirnamelen);
- ut_ad(!os_aio_segment_wait_events || !srv_use_native_aio);
- ut_ad(srv_use_native_aio || os_aio_segment_wait_events
- || !srv_was_started);
+ /* Add a path separator if needed. */
+ if (dirnamelen && name[dirnamelen - 1] != OS_PATH_SEPARATOR) {
- if (!srv_use_native_aio && os_aio_segment_wait_events) {
- for (ulint i = 0; i < os_aio_n_segments; i++) {
- os_event_destroy(os_aio_segment_wait_events[i]);
+ name[dirnamelen++] = OS_PATH_SEPARATOR;
}
- ut_free(os_aio_segment_wait_events);
- os_aio_segment_wait_events = 0;
- }
- os_aio_n_segments = 0;
-}
-
-/** Wakes up all async i/o threads so that they know to exit themselves in
-shutdown. */
-void
-os_aio_wake_all_threads_at_shutdown()
-{
-#ifdef WIN_ASYNC_IO
- AIO::wake_at_shutdown();
-#elif defined(LINUX_NATIVE_AIO)
- /* When using native AIO interface the io helper threads
- wait on io_getevents with a timeout value of 500ms. At
- each wake up these threads check the server status.
- No need to do anything to wake them up. */
-#endif /* !WIN_ASYNC_AIO */
-
- if (srv_use_native_aio) {
- return;
- }
-
- /* This loop wakes up all simulated ai/o threads */
-
- for (ulint i = 0; i < os_aio_n_segments; ++i) {
-
- os_event_set(os_aio_segment_wait_events[i]);
- }
-}
-
-/** Waits until there are no pending writes in AIO::s_writes. There can
-be other, synchronous, pending writes. */
-void
-os_aio_wait_until_no_pending_writes()
-{
- AIO::wait_until_no_pending_writes();
-}
-
-/** Calculates segment number for a slot.
-@param[in] array AIO wait array
-@param[in] slot slot in this array
-@return segment number (which is the number used by, for example,
- I/O-handler threads) */
-ulint
-AIO::get_segment_no_from_slot(
- const AIO* array,
- const Slot* slot)
-{
- ulint segment;
- ulint seg_len;
-
- if (array == s_ibuf) {
- ut_ad(!srv_read_only_mode);
-
- segment = IO_IBUF_SEGMENT;
-
- } else if (array == s_log) {
- ut_ad(!srv_read_only_mode);
-
- segment = IO_LOG_SEGMENT;
-
- } else if (array == s_reads) {
- seg_len = s_reads->slots_per_segment();
-
- segment = (srv_read_only_mode ? 0 : 2) + slot->pos / seg_len;
- } else {
- ut_a(array == s_writes);
-
- seg_len = s_writes->slots_per_segment();
-
- segment = s_reads->m_n_segments
- + (srv_read_only_mode ? 0 : 2) + slot->pos / seg_len;
- }
-
- return(segment);
-}
-
-/** Requests for a slot in the aio array. If no slot is available, waits until
-not_full-event becomes signaled.
-
-@param[in] type IO context
-@param[in,out] m1 message to be passed along with the AIO
- operation
-@param[in,out] m2 message to be passed along with the AIO
- operation
-@param[in] file file handle
-@param[in] name name of the file or path as a NUL-terminated
- string
-@param[in,out] buf buffer where to read or from which to write
-@param[in] offset file offset, where to read from or start writing
-@param[in] len length of the block to read or write
-@return pointer to slot */
-Slot*
-AIO::reserve_slot(
- const IORequest& type,
- fil_node_t* m1,
- void* m2,
- pfs_os_file_t file,
- const char* name,
- void* buf,
- os_offset_t offset,
- ulint len)
-{
-#ifdef WIN_ASYNC_IO
- ut_a((len & 0xFFFFFFFFUL) == len);
-#endif /* WIN_ASYNC_IO */
-
- /* No need of a mutex. Only reading constant fields */
- ulint slots_per_seg;
-
- ut_ad(type.validate());
-
- slots_per_seg = slots_per_segment();
-
- /* We attempt to keep adjacent blocks in the same local
- segment. This can help in merging IO requests when we are
- doing simulated AIO */
- ulint local_seg;
-
- local_seg = (offset >> (srv_page_size_shift + 6)) % m_n_segments;
-
- for (;;) {
-
- acquire();
+ strcpy(name + dirnamelen, "ib_logfile0");
- if (m_n_reserved != m_slots.size()) {
- break;
- }
+ fd = my_open(name, O_RDONLY | O_CLOEXEC, MYF(0));
- release();
+ if (fd == -1) {
- if (!srv_use_native_aio) {
- /* If the handler threads are suspended,
- wake them so that we get more slots */
+ ib::warn()
+ << "Unable to open"
+ << " \"" << name << "\" to check native"
+ << " AIO read support.";
- os_aio_simulated_wake_handler_threads();
+ return(false);
}
-
- os_event_wait(m_not_full);
}
- ulint counter = 0;
- Slot* slot = NULL;
+ struct io_event io_event;
- /* We start our search for an available slot from our preferred
- local segment and do a full scan of the array. We are
- guaranteed to find a slot in full scan. */
- for (ulint i = local_seg * slots_per_seg;
- counter < m_slots.size();
- ++i, ++counter) {
+ memset(&io_event, 0x0, sizeof(io_event));
- i %= m_slots.size();
+ byte* buf = static_cast<byte*>(ut_malloc_nokey(srv_page_size * 2));
+ byte* ptr = static_cast<byte*>(ut_align(buf, srv_page_size));
- slot = at(i);
+ struct iocb iocb;
- if (slot->is_reserved == false) {
- break;
- }
- }
+ /* Suppress valgrind warning. */
+ memset(buf, 0x00, srv_page_size * 2);
+ memset(&iocb, 0x0, sizeof(iocb));
- /* We MUST always be able to get hold of a reserved slot. */
- ut_a(counter < m_slots.size());
+ struct iocb* p_iocb = &iocb;
- ut_a(slot->is_reserved == false);
+ if (!srv_read_only_mode) {
- ++m_n_reserved;
+ io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0);
- if (m_n_reserved == 1) {
- os_event_reset(m_is_empty);
}
-
- if (m_n_reserved == m_slots.size()) {
- os_event_reset(m_not_full);
+ else {
+ ut_a(srv_page_size >= 512);
+ io_prep_pread(p_iocb, fd, ptr, 512, 0);
}
- slot->is_reserved = true;
- slot->reservation_time = time(NULL);
- slot->m1 = m1;
- slot->m2 = m2;
- slot->file = file;
- slot->name = name;
-#ifdef _WIN32
- slot->len = static_cast<DWORD>(len);
-#else
- slot->len = len;
-#endif /* _WIN32 */
- slot->type = type;
- slot->buf = static_cast<byte*>(buf);
- slot->ptr = slot->buf;
- slot->offset = offset;
- slot->err = DB_SUCCESS;
- slot->original_len = static_cast<uint32>(len);
- slot->io_already_done = false;
- slot->buf = static_cast<byte*>(buf);
-
-#ifdef WIN_ASYNC_IO
- {
- OVERLAPPED* control;
+ int err = io_submit(io_ctx, 1, &p_iocb);
+ srv_stats.buffered_aio_submitted.inc();
- control = &slot->control;
- control->Offset = (DWORD) offset & 0xFFFFFFFF;
- control->OffsetHigh = (DWORD) (offset >> 32);
+ if (err >= 1) {
+ /* Now collect the submitted IO request. */
+ err = io_getevents(io_ctx, 1, 1, &io_event, NULL);
}
-#elif defined(LINUX_NATIVE_AIO)
-
- /* If we are not using native AIO skip this part. */
- if (srv_use_native_aio) {
-
- off_t aio_offset;
-
- /* Check if we are dealing with 64 bit arch.
- If not then make sure that offset fits in 32 bits. */
- aio_offset = (off_t) offset;
-
- ut_a(sizeof(aio_offset) >= sizeof(offset)
- || ((os_offset_t) aio_offset) == offset);
-
- struct iocb* iocb = &slot->control;
- if (type.is_read()) {
-
- io_prep_pread(
- iocb, file, slot->ptr, slot->len, aio_offset);
- } else {
- ut_ad(type.is_write());
+ ut_free(buf);
+ my_close(fd, MYF(MY_WME));
- io_prep_pwrite(
- iocb, file, slot->ptr, slot->len, aio_offset);
- }
+ switch (err) {
+ case 1:
+ return(true);
- iocb->data = slot;
+ case -EINVAL:
+ case -ENOSYS:
+ ib::error()
+ << "Linux Native AIO not supported. You can either"
+ " move "
+ << (srv_read_only_mode ? name : "tmpdir")
+ << " to a file system that supports native"
+ " AIO or you can set innodb_use_native_aio to"
+ " FALSE to avoid this message.";
- slot->n_bytes = 0;
- slot->ret = 0;
+ /* fall through. */
+ default:
+ ib::error()
+ << "Linux Native AIO check on "
+ << (srv_read_only_mode ? name : "tmpdir")
+ << "returned error[" << -err << "]";
}
-#endif /* LINUX_NATIVE_AIO */
-
- release();
- return(slot);
+ return(false);
}
+#endif
-/** Wakes up a simulated aio i/o-handler thread if it has something to do.
-@param[in] global_segment The number of the segment in the AIO arrays */
-void
-AIO::wake_simulated_handler_thread(ulint global_segment)
-{
- ut_ad(!srv_use_native_aio);
-
- AIO* array;
- ulint segment = get_array_and_local_segment(&array, global_segment);
- array->wake_simulated_handler_thread(global_segment, segment);
-}
-/** Wakes up a simulated AIO I/O-handler thread if it has something to do
-for a local segment in the AIO array.
-@param[in] global_segment The number of the segment in the AIO arrays
-@param[in] segment The local segment in the AIO array */
-void
-AIO::wake_simulated_handler_thread(ulint global_segment, ulint segment)
+bool os_aio_init(ulint n_reader_threads, ulint n_writer_threads, ulint)
{
- ut_ad(!srv_use_native_aio);
-
- ulint n = slots_per_segment();
- ulint offset = segment * n;
+ int max_write_events = (int)n_writer_threads * OS_AIO_N_PENDING_IOS_PER_THREAD;
+ int max_read_events = (int)n_reader_threads * OS_AIO_N_PENDING_IOS_PER_THREAD;
+ int max_ibuf_events = 1 * OS_AIO_N_PENDING_IOS_PER_THREAD;
+ int max_events = max_read_events + max_write_events + max_ibuf_events;
+ int ret;
- /* Look through n slots after the segment * n'th slot */
-
- acquire();
-
- const Slot* slot = at(offset);
-
- for (ulint i = 0; i < n; ++i, ++slot) {
-
- if (slot->is_reserved) {
-
- /* Found an i/o request */
-
- release();
-
- os_event_t event;
-
- event = os_aio_segment_wait_events[global_segment];
-
- os_event_set(event);
-
- return;
- }
+#if LINUX_NATIVE_AIO
+ if (srv_use_native_aio && !is_linux_native_aio_supported())
+ srv_use_native_aio = false;
+#endif
+ ret = srv_thread_pool->configure_aio(srv_use_native_aio, max_events);
+ if(ret) {
+ ut_a(srv_use_native_aio);
+ srv_use_native_aio = false;
+#ifdef LINUX_NATIVE_AIO
+ ib::info() << "Linux native AIO disabled";
+#endif
+ ret = srv_thread_pool->configure_aio(srv_use_native_aio, max_events);
+ DBUG_ASSERT(!ret);
}
-
- release();
+ read_slots = new io_slots(max_read_events, (uint)n_reader_threads);
+ write_slots = new io_slots(max_write_events, (uint)n_writer_threads);
+ ibuf_slots = new io_slots(max_ibuf_events, 1);
+ return true;
}
-/** Wakes up simulated aio i/o-handler threads if they have something to do. */
-void
-os_aio_simulated_wake_handler_threads()
+void os_aio_free(void)
{
- if (srv_use_native_aio) {
- /* We do not use simulated aio: do nothing */
-
- return;
- }
-
- os_aio_recommend_sleep_for_read_threads = false;
-
- for (ulint i = 0; i < os_aio_n_segments; i++) {
- AIO::wake_simulated_handler_thread(i);
- }
+ srv_thread_pool->disable_aio();
+ delete read_slots;
+ delete write_slots;
+ delete ibuf_slots;
}
-/** Select the IO slot array
-@param[in,out] type Type of IO, READ or WRITE
-@param[in] read_only true if running in read-only mode
-@param[in] mode IO mode
-@return slot array or NULL if invalid mode specified */
-AIO*
-AIO::select_slot_array(IORequest& type, bool read_only, ulint mode)
+/** Waits until there are no pending writes. There can
+be other, synchronous, pending writes. */
+void
+os_aio_wait_until_no_pending_writes()
{
- AIO* array;
-
- ut_ad(type.validate());
-
- switch (mode) {
- case OS_AIO_NORMAL:
-
- array = type.is_read() ? AIO::s_reads : AIO::s_writes;
- break;
-
- case OS_AIO_IBUF:
- ut_ad(type.is_read());
-
- /* Reduce probability of deadlock bugs in connection with ibuf:
- do not let the ibuf i/o handler sleep */
-
- type.clear_do_not_wake();
-
- array = read_only ? AIO::s_reads : AIO::s_ibuf;
- break;
-
- case OS_AIO_LOG:
-
- array = read_only ? AIO::s_reads : AIO::s_log;
- break;
-
- case OS_AIO_SYNC:
-
- array = AIO::s_sync;
-#if defined(LINUX_NATIVE_AIO)
- /* In Linux native AIO we don't use sync IO array. */
- ut_a(!srv_use_native_aio);
-#endif /* LINUX_NATIVE_AIO */
- break;
-
- default:
- ut_error;
- array = NULL; /* Eliminate compiler warning */
- }
-
- return(array);
+ write_slots->wait();
}
-#ifdef WIN_ASYNC_IO
-/** This function is only used in Windows asynchronous i/o.
-Waits for an aio operation to complete. This function is used to wait the
-for completed requests. The aio array of pending requests is divided
-into segments. The thread specifies which segment or slot it wants to wait
-for. NOTE: this function will also take care of freeing the aio slot,
-therefore no other thread is allowed to do the freeing!
-@param[in] segment The number of the segment in the aio arrays to
- wait for; segment 0 is the ibuf I/O thread,
- segment 1 the log I/O thread, then follow the
- non-ibuf read threads, and as the last are the
- non-ibuf write threads; if this is
- ULINT_UNDEFINED, then it means that sync AIO
- is used, and this parameter is ignored
-@param[in] pos this parameter is used only in sync AIO:
- wait for the aio slot at this position
-@param[out] m1 the messages passed with the AIO request; note
- that also in the case where the AIO operation
- failed, these output parameters are valid and
- can be used to restart the operation,
- for example
-@param[out] m2 callback message
-@param[out] type OS_FILE_WRITE or ..._READ
-@return DB_SUCCESS or error code */
-
-
-
-static
-dberr_t
-os_aio_windows_handler(
- ulint segment,
- ulint pos,
- fil_node_t** m1,
- void** m2,
- IORequest* type)
-{
- Slot* slot= 0;
- dberr_t err;
-
- BOOL ret;
- ULONG_PTR key;
-
- ut_a(segment != ULINT_UNDEFINED);
-
- /* NOTE! We only access constant fields in os_aio_array. Therefore
- we do not have to acquire the protecting mutex yet */
-
- ut_ad(os_aio_validate_skip());
- AIO *my_array;
- AIO::get_array_and_local_segment(&my_array, segment);
-
- HANDLE port = my_array->m_completion_port;
- ut_ad(port);
- for (;;) {
- DWORD len;
- ret = GetQueuedCompletionStatus(port, &len, &key,
- (OVERLAPPED **)&slot, INFINITE);
-
- /* If shutdown key was received, repost the shutdown message and exit */
- if (ret && key == IOCP_SHUTDOWN_KEY) {
- PostQueuedCompletionStatus(port, 0, key, NULL);
- *m1 = NULL;
- *m2 = NULL;
- return (DB_SUCCESS);
- }
-
- ut_a(slot);
-
- if (!ret) {
- /* IO failed */
- break;
- }
-
- slot->n_bytes= len;
- ut_a(slot->array);
- HANDLE slot_port = slot->array->m_completion_port;
- if (slot_port != port) {
- /* there are no redirections between data and log */
- ut_ad(port == data_completion_port);
- ut_ad(slot_port != log_completion_port);
-
- /*
- Redirect completions to the dedicated completion port
- and threads.
-
- "Write array" threads receive write,read and ibuf
- notifications, read and ibuf completions are redirected.
-
- Forwarding IO completion this way costs a context switch,
- and this seems tolerable since asynchronous reads are by
- far less frequent.
- */
- ut_a(PostQueuedCompletionStatus(slot_port,
- len, key, &slot->control));
- }
- else {
- break;
- }
- }
-
- ut_a(slot->is_reserved);
-
- *m1 = slot->m1;
- *m2 = slot->m2;
-
- *type = slot->type;
-
- bool retry = false;
-
- if (ret && slot->n_bytes == slot->len) {
-
- err = DB_SUCCESS;
-
- } else if (os_file_handle_error(slot->name, "Windows aio")) {
-
- retry = true;
-
- } else {
-
- err = DB_IO_ERROR;
- }
-
-
- if (retry) {
- /* Retry failed read/write operation synchronously. */
-
-#ifdef UNIV_PFS_IO
- /* This read/write does not go through os_file_read
- and os_file_write APIs, need to register with
- performance schema explicitly here. */
- PSI_file_locker_state state;
- struct PSI_file_locker* locker = NULL;
-
- register_pfs_file_io_begin(
- &state, locker, slot->file, slot->len,
- slot->type.is_write()
- ? PSI_FILE_WRITE : PSI_FILE_READ, __FILE__, __LINE__);
-#endif /* UNIV_PFS_IO */
-
- ut_a((slot->len & 0xFFFFFFFFUL) == slot->len);
-
- ssize_t n_bytes = SyncFileIO::execute(slot);
-
-#ifdef UNIV_PFS_IO
- register_pfs_file_io_end(locker, slot->len);
-#endif /* UNIV_PFS_IO */
- err = (n_bytes == slot->len) ? DB_SUCCESS : DB_IO_ERROR;
- }
-
- if (err == DB_SUCCESS) {
- err = AIOHandler::post_io_processing(slot);
- }
-
- slot->array->release_with_mutex(slot);
-
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS
- && !buf_page_cleaner_is_active
- && os_aio_all_slots_free()) {
- /* Last IO, wakeup other io threads */
- AIO::wake_at_shutdown();
- }
- return(err);
-}
-#endif /* WIN_ASYNC_IO */
/**
NOTE! Use the corresponding macro os_aio(), not directly this function!
@@ -6485,14 +4127,10 @@ os_aio_func(
fil_node_t* m1,
void* m2)
{
-#ifdef WIN_ASYNC_IO
- BOOL ret = TRUE;
-#endif /* WIN_ASYNC_IO */
ut_ad(n > 0);
ut_ad((n % OS_FILE_LOG_BLOCK_SIZE) == 0);
ut_ad((offset % OS_FILE_LOG_BLOCK_SIZE) == 0);
- ut_ad(os_aio_validate_skip());
#ifdef WIN_ASYNC_IO
ut_ad((n & 0xFFFFFFFFUL) == n);
@@ -6511,777 +4149,43 @@ os_aio_func(
return(os_file_write_func(type, name, file, buf, offset, n));
}
-try_again:
-
- AIO* array;
-
- array = AIO::select_slot_array(type, read_only, mode);
-
- Slot* slot;
-
- slot = array->reserve_slot(type, m1, m2, file, name, buf, offset, n);
-
if (type.is_read()) {
-
-
- if (srv_use_native_aio) {
-
++os_n_file_reads;
-
- os_bytes_read_since_printout += n;
-#ifdef WIN_ASYNC_IO
- ret = ReadFile(
- file, slot->ptr, slot->len,
- NULL, &slot->control);
-#elif defined(LINUX_NATIVE_AIO)
- if (!array->linux_dispatch(slot)) {
- goto err_exit;
- }
-#endif /* WIN_ASYNC_IO */
- } else if (type.is_wake()) {
- AIO::wake_simulated_handler_thread(
- AIO::get_segment_no_from_slot(array, slot));
- }
} else if (type.is_write()) {
-
- if (srv_use_native_aio) {
++os_n_file_writes;
-
-#ifdef WIN_ASYNC_IO
- ret = WriteFile(
- file, slot->ptr, slot->len,
- NULL, &slot->control);
-#elif defined(LINUX_NATIVE_AIO)
- if (!array->linux_dispatch(slot)) {
- goto err_exit;
- }
-#endif /* WIN_ASYNC_IO */
-
- } else if (type.is_wake()) {
- AIO::wake_simulated_handler_thread(
- AIO::get_segment_no_from_slot(array, slot));
- }
} else {
ut_error;
}
-#ifdef WIN_ASYNC_IO
- if (ret || (GetLastError() == ERROR_IO_PENDING)) {
- /* aio completed or was queued successfully! */
- return(DB_SUCCESS);
+ compile_time_assert(sizeof(os_aio_userdata_t) <= tpool::MAX_AIO_USERDATA_LEN);
+ os_aio_userdata_t userdata{m1,type,m2};
+ io_slots* slots;
+ if (type.is_read()) {
+ slots = mode == OS_AIO_IBUF?ibuf_slots: read_slots;
+ } else {
+ slots = write_slots;
}
+ tpool::aiocb* cb = slots->acquire();
- goto err_exit;
+ cb->m_buffer = buf;
+ cb->m_callback = (tpool::callback_func)io_callback;
+ cb->m_group = slots->get_task_group();
+ cb->m_fh = file.m_file;
+ cb->m_len = (int)n;
+ cb->m_offset = offset;
+ cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE;
+ memcpy(cb->m_userdata, &userdata, sizeof(userdata));
-#endif /* WIN_ASYNC_IO */
-
- /* AIO request was queued successfully! */
- return(DB_SUCCESS);
+ if (!srv_thread_pool->submit_io(cb))
+ return DB_SUCCESS;
-#if defined LINUX_NATIVE_AIO || defined WIN_ASYNC_IO
-err_exit:
-#endif /* LINUX_NATIVE_AIO || WIN_ASYNC_IO */
+ slots->release(cb);
- array->release_with_mutex(slot);
-
- if (os_file_handle_error(
- name, type.is_read() ? "aio read" : "aio write")) {
-
- goto try_again;
- }
+ os_file_handle_error(name, type.is_read() ? "aio read" : "aio write");
return(DB_IO_ERROR);
}
-/** Simulated AIO handler for reaping IO requests */
-class SimulatedAIOHandler {
-
-public:
-
- /** Constructor
- @param[in,out] array The AIO array
- @param[in] segment Local segment in the array */
- SimulatedAIOHandler(AIO* array, ulint segment)
- :
- m_oldest(),
- m_n_elems(),
- m_lowest_offset(IB_UINT64_MAX),
- m_array(array),
- m_n_slots(),
- m_segment(segment),
- m_ptr(),
- m_buf()
- {
- ut_ad(m_segment < 100);
-
- m_slots.resize(OS_AIO_MERGE_N_CONSECUTIVE);
- }
-
- /** Destructor */
- ~SimulatedAIOHandler()
- {
- if (m_ptr != NULL) {
- ut_free(m_ptr);
- }
- }
-
- /** Reset the state of the handler
- @param[in] n_slots Number of pending AIO operations supported */
- void init(ulint n_slots)
- {
- m_oldest = 0;
- m_n_elems = 0;
- m_n_slots = n_slots;
- m_lowest_offset = IB_UINT64_MAX;
-
- if (m_ptr != NULL) {
- ut_free(m_ptr);
- m_ptr = m_buf = NULL;
- }
-
- m_slots[0] = NULL;
- }
-
- /** Check if there is a slot for which the i/o has already been done
- @param[out] n_reserved Number of reserved slots
- @return the first completed slot that is found. */
- Slot* check_completed(ulint* n_reserved)
- {
- ulint offset = m_segment * m_n_slots;
-
- *n_reserved = 0;
-
- Slot* slot;
-
- slot = m_array->at(offset);
-
- for (ulint i = 0; i < m_n_slots; ++i, ++slot) {
-
- if (slot->is_reserved) {
-
- if (slot->io_already_done) {
-
- ut_a(slot->is_reserved);
-
- return(slot);
- }
-
- ++*n_reserved;
- }
- }
-
- return(NULL);
- }
-
- /** If there are at least 2 seconds old requests, then pick the
- oldest one to prevent starvation. If several requests have the
- same age, then pick the one at the lowest offset.
- @return true if request was selected */
- bool select()
- {
- if (!select_oldest()) {
-
- return(select_lowest_offset());
- }
-
- return(true);
- }
-
- /** Check if there are several consecutive blocks
- to read or write. Merge them if found. */
- void merge()
- {
- /* if m_n_elems != 0, then we have assigned
- something valid to consecutive_ios[0] */
- ut_ad(m_n_elems != 0);
- ut_ad(first_slot() != NULL);
-
- Slot* slot = first_slot();
-
- while (!merge_adjacent(slot)) {
- /* No op */
- }
- }
-
- /** We have now collected n_consecutive I/O requests
- in the array; allocate a single buffer which can hold
- all data, and perform the I/O
- @return the length of the buffer */
- ulint allocate_buffer()
- MY_ATTRIBUTE((warn_unused_result))
- {
- ulint len;
- Slot* slot = first_slot();
-
- ut_ad(m_ptr == NULL);
-
- if (slot->type.is_read() && m_n_elems > 1) {
-
- len = 0;
-
- for (ulint i = 0; i < m_n_elems; ++i) {
- len += m_slots[i]->len;
- }
-
- m_ptr = static_cast<byte*>(
- ut_malloc_nokey(len + srv_page_size));
-
- m_buf = static_cast<byte*>(
- ut_align(m_ptr, srv_page_size));
-
- } else {
- len = first_slot()->len;
- m_buf = first_slot()->buf;
- }
-
- return(len);
- }
-
- /** We have to compress the individual pages and punch
- holes in them on a page by page basis when writing to
- tables that can be compresed at the IO level.
- @param[in] len Value returned by allocate_buffer */
- void copy_to_buffer(ulint len)
- {
- Slot* slot = first_slot();
-
- if (len > slot->len && slot->type.is_write()) {
-
- byte* ptr = m_buf;
-
- ut_ad(ptr != slot->buf);
-
- /* Copy the buffers to the combined buffer */
- for (ulint i = 0; i < m_n_elems; ++i) {
-
- slot = m_slots[i];
-
- memmove(ptr, slot->buf, slot->len);
-
- ptr += slot->len;
- }
- }
- }
-
- /** Do the I/O with ordinary, synchronous i/o functions:
- @param[in] len Length of buffer for IO */
- void io()
- {
- if (first_slot()->type.is_write()) {
-
- for (ulint i = 0; i < m_n_elems; ++i) {
- write(m_slots[i]);
- }
-
- } else {
-
- for (ulint i = 0; i < m_n_elems; ++i) {
- read(m_slots[i]);
- }
- }
- }
-
- /** Mark the i/os done in slots */
- void done()
- {
- for (ulint i = 0; i < m_n_elems; ++i) {
- m_slots[i]->io_already_done = true;
- }
- }
-
- /** @return the first slot in the consecutive array */
- Slot* first_slot()
- MY_ATTRIBUTE((warn_unused_result))
- {
- ut_a(m_n_elems > 0);
-
- return(m_slots[0]);
- }
-
- /** Wait for I/O requests
- @param[in] global_segment The global segment
- @param[in,out] event Wait on event if no active requests
- @return the number of slots */
- ulint check_pending(
- ulint global_segment,
- os_event_t event)
- MY_ATTRIBUTE((warn_unused_result));
-private:
-
- /** Do the file read
- @param[in,out] slot Slot that has the IO context */
- void read(Slot* slot)
- {
- dberr_t err = os_file_read(
- slot->type,
- slot->file,
- slot->ptr,
- slot->offset,
- slot->len);
-
- ut_a(err == DB_SUCCESS);
- }
-
- /** Do the file read
- @param[in,out] slot Slot that has the IO context */
- void write(Slot* slot)
- {
- dberr_t err = os_file_write(
- slot->type,
- slot->name,
- slot->file,
- slot->ptr,
- slot->offset,
- slot->len);
-
- ut_a(err == DB_SUCCESS);
- }
-
- /** @return true if the slots are adjacent and can be merged */
- bool adjacent(const Slot* s1, const Slot* s2) const
- {
- return(s1 != s2
- && s1->file == s2->file
- && s2->offset == s1->offset + s1->len
- && s1->type == s2->type);
- }
-
- /** @return true if merge limit reached or no adjacent slots found. */
- bool merge_adjacent(Slot*& current)
- {
- Slot* slot;
- ulint offset = m_segment * m_n_slots;
-
- slot = m_array->at(offset);
-
- for (ulint i = 0; i < m_n_slots; ++i, ++slot) {
-
- if (slot->is_reserved && adjacent(current, slot)) {
-
- current = slot;
-
- /* Found a consecutive i/o request */
-
- m_slots[m_n_elems] = slot;
-
- ++m_n_elems;
-
- return(m_n_elems >= m_slots.capacity());
- }
- }
-
- return(true);
- }
-
- /** There were no old requests. Look for an I/O request at the lowest
- offset in the array (we ignore the high 32 bits of the offset in these
- heuristics) */
- bool select_lowest_offset()
- {
- ut_ad(m_n_elems == 0);
-
- ulint offset = m_segment * m_n_slots;
-
- m_lowest_offset = IB_UINT64_MAX;
-
- for (ulint i = 0; i < m_n_slots; ++i) {
- Slot* slot;
-
- slot = m_array->at(i + offset);
-
- if (slot->is_reserved
- && slot->offset < m_lowest_offset) {
-
- /* Found an i/o request */
- m_slots[0] = slot;
-
- m_n_elems = 1;
-
- m_lowest_offset = slot->offset;
- }
- }
-
- return(m_n_elems > 0);
- }
-
- /** Select the slot if it is older than the current oldest slot.
- @param[in] slot The slot to check */
- void select_if_older(Slot* slot)
- {
- ulint age;
-
- age = (ulint) difftime(time(NULL), slot->reservation_time);
-
- if ((age >= 2 && age > m_oldest)
- || (age >= 2
- && age == m_oldest
- && slot->offset < m_lowest_offset)) {
-
- /* Found an i/o request */
- m_slots[0] = slot;
-
- m_n_elems = 1;
-
- m_oldest = age;
-
- m_lowest_offset = slot->offset;
- }
- }
-
- /** Select th oldest slot in the array
- @return true if oldest slot found */
- bool select_oldest()
- {
- ut_ad(m_n_elems == 0);
-
- Slot* slot;
- ulint offset = m_n_slots * m_segment;
-
- slot = m_array->at(offset);
-
- for (ulint i = 0; i < m_n_slots; ++i, ++slot) {
-
- if (slot->is_reserved) {
- select_if_older(slot);
- }
- }
-
- return(m_n_elems > 0);
- }
-
- typedef std::vector<Slot*> slots_t;
-
-private:
- ulint m_oldest;
- ulint m_n_elems;
- os_offset_t m_lowest_offset;
-
- AIO* m_array;
- ulint m_n_slots;
- ulint m_segment;
-
- slots_t m_slots;
-
- byte* m_ptr;
- byte* m_buf;
-};
-
-/** Wait for I/O requests
-@return the number of slots */
-ulint
-SimulatedAIOHandler::check_pending(
- ulint global_segment,
- os_event_t event)
-{
- /* NOTE! We only access constant fields in os_aio_array.
- Therefore we do not have to acquire the protecting mutex yet */
-
- ut_ad(os_aio_validate_skip());
-
- ut_ad(m_segment < m_array->get_n_segments());
-
- /* Look through n slots after the segment * n'th slot */
-
- if (AIO::is_read(m_array)
- && os_aio_recommend_sleep_for_read_threads) {
-
- /* Give other threads chance to add several
- I/Os to the array at once. */
-
- srv_set_io_thread_op_info(
- global_segment, "waiting for i/o request");
-
- os_event_wait(event);
-
- return(0);
- }
-
- return(m_array->slots_per_segment());
-}
-
-/** Does simulated AIO. This function should be called by an i/o-handler
-thread.
-
-@param[in] segment The number of the segment in the aio arrays to wait
- for; segment 0 is the ibuf i/o thread, segment 1 the
- log i/o thread, then follow the non-ibuf read threads,
- and as the last are the non-ibuf write threads
-@param[out] m1 the messages passed with the AIO request; note that
- also in the case where the AIO operation failed, these
- output parameters are valid and can be used to restart
- the operation, for example
-@param[out] m2 Callback argument
-@param[in] type IO context
-@return DB_SUCCESS or error code */
-static
-dberr_t
-os_aio_simulated_handler(
- ulint global_segment,
- fil_node_t** m1,
- void** m2,
- IORequest* type)
-{
- Slot* slot;
- AIO* array;
- ulint segment;
- os_event_t event = os_aio_segment_wait_events[global_segment];
-
- segment = AIO::get_array_and_local_segment(&array, global_segment);
-
- SimulatedAIOHandler handler(array, segment);
-
- for (;;) {
-
- srv_set_io_thread_op_info(
- global_segment, "looking for i/o requests (a)");
-
- ulint n_slots = handler.check_pending(global_segment, event);
-
- if (n_slots == 0) {
- continue;
- }
-
- handler.init(n_slots);
-
- srv_set_io_thread_op_info(
- global_segment, "looking for i/o requests (b)");
-
- array->acquire();
-
- ulint n_reserved;
-
- slot = handler.check_completed(&n_reserved);
-
- if (slot != NULL) {
-
- break;
-
- } else if (n_reserved == 0
- && !buf_page_cleaner_is_active
- && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
-
- /* There is no completed request. If there
- are no pending request at all, and the system
- is being shut down, exit. */
-
- array->release();
-
- *m1 = NULL;
-
- *m2 = NULL;
-
- return(DB_SUCCESS);
-
- } else if (handler.select()) {
-
- break;
- }
-
- /* No I/O requested at the moment */
-
- srv_set_io_thread_op_info(
- global_segment, "resetting wait event");
-
- /* We wait here until tbere are more IO requests
- for this segment. */
-
- os_event_reset(event);
-
- array->release();
-
- srv_set_io_thread_op_info(
- global_segment, "waiting for i/o request");
-
- os_event_wait(event);
- }
-
- /** Found a slot that has already completed its IO */
-
- if (slot == NULL) {
- /* Merge adjacent requests */
- handler.merge();
-
- /* Check if there are several consecutive blocks
- to read or write */
-
- srv_set_io_thread_op_info(
- global_segment, "consecutive i/o requests");
-
- // Note: We don't support write combining for simulated AIO.
- //ulint total_len = handler.allocate_buffer();
-
- /* We release the array mutex for the time of the I/O: NOTE that
- this assumes that there is just one i/o-handler thread serving
- a single segment of slots! */
-
- array->release();
-
- // Note: We don't support write combining for simulated AIO.
- //handler.copy_to_buffer(total_len);
-
- srv_set_io_thread_op_info(global_segment, "doing file i/o");
-
- handler.io();
-
- srv_set_io_thread_op_info(global_segment, "file i/o done");
-
- array->acquire();
-
- handler.done();
-
- /* We return the messages for the first slot now, and if there
- were several slots, the messages will be returned with
- subsequent calls of this function */
-
- slot = handler.first_slot();
- }
-
- ut_ad(slot->is_reserved);
-
- *m1 = slot->m1;
- *m2 = slot->m2;
-
- *type = slot->type;
-
- array->release(slot);
-
- array->release();
-
- return(DB_SUCCESS);
-}
-
-/** Get the total number of pending IOs
-@return the total number of pending IOs */
-ulint
-AIO::total_pending_io_count()
-{
- ulint count = s_reads->pending_io_count();
-
- if (s_writes != NULL) {
- count += s_writes->pending_io_count();
- }
-
- if (s_ibuf != NULL) {
- count += s_ibuf->pending_io_count();
- }
-
- if (s_log != NULL) {
- count += s_log->pending_io_count();
- }
-
- if (s_sync != NULL) {
- count += s_sync->pending_io_count();
- }
-
- return(count);
-}
-
-/** Validates the consistency the aio system.
-@return true if ok */
-static
-bool
-os_aio_validate()
-{
- /* The methods countds and validates, we ignore the count. */
- AIO::total_pending_io_count();
-
- return(true);
-}
-
-/** Prints pending IO requests per segment of an aio array.
-We probably don't need per segment statistics but they can help us
-during development phase to see if the IO requests are being
-distributed as expected.
-@param[in,out] file File where to print
-@param[in] segments Pending IO array */
-void
-AIO::print_segment_info(
- FILE* file,
- const ulint* segments)
-{
- ut_ad(m_n_segments > 0);
-
- if (m_n_segments > 1) {
-
- fprintf(file, " [");
-
- for (ulint i = 0; i < m_n_segments; ++i, ++segments) {
-
- if (i != 0) {
- fprintf(file, ", ");
- }
-
- fprintf(file, ULINTPF, *segments);
- }
-
- fprintf(file, "] ");
- }
-}
-
-/** Prints info about the aio array.
-@param[in,out] file Where to print */
-void
-AIO::print(FILE* file)
-{
- ulint count = 0;
- ulint n_res_seg[SRV_MAX_N_IO_THREADS];
-
- mutex_enter(&m_mutex);
-
- ut_a(!m_slots.empty());
- ut_a(m_n_segments > 0);
-
- memset(n_res_seg, 0x0, sizeof(n_res_seg));
-
- for (ulint i = 0; i < m_slots.size(); ++i) {
- Slot& slot = m_slots[i];
- ulint segment = (i * m_n_segments) / m_slots.size();
-
- if (slot.is_reserved) {
-
- ++count;
-
- ++n_res_seg[segment];
-
- ut_a(slot.len > 0);
- }
- }
-
- ut_a(m_n_reserved == count);
-
- print_segment_info(file, n_res_seg);
-
- mutex_exit(&m_mutex);
-}
-
-/** Print all the AIO segments
-@param[in,out] file Where to print */
-void
-AIO::print_all(FILE* file)
-{
- s_reads->print(file);
-
- if (s_writes != NULL) {
- fputs(", aio writes:", file);
- s_writes->print(file);
- }
-
- if (s_ibuf != NULL) {
- fputs(",\n ibuf aio reads:", file);
- s_ibuf->print(file);
- }
-
- if (s_log != NULL) {
- fputs(", log i/o's:", file);
- s_log->print(file);
- }
-
- if (s_sync != NULL) {
- fputs(", sync i/o's:", file);
- s_sync->print(file);
- }
-}
-
/** Prints info of the aio arrays.
@param[in,out] file file where to print */
void
@@ -7297,19 +4201,11 @@ os_aio_print(FILE* file)
srv_io_thread_op_info[i],
srv_io_thread_function[i]);
-#ifndef _WIN32
- if (!srv_use_native_aio
- && os_event_is_set(os_aio_segment_wait_events[i])) {
- fprintf(file, " ev set");
- }
-#endif /* _WIN32 */
-
fprintf(file, "\n");
}
fputs("Pending normal aio reads:", file);
- AIO::print_all(file);
putc('\n', file);
current_time = time(NULL);
@@ -7381,82 +4277,6 @@ os_aio_refresh_stats()
os_last_printout = time(NULL);
}
-/** Checks that all slots in the system have been freed, that is, there are
-no pending io operations.
-@return true if all free */
-bool
-os_aio_all_slots_free()
-{
- return(AIO::total_pending_io_count() == 0);
-}
-
-#ifdef UNIV_DEBUG
-/** Prints all pending IO for the array
-@param[in] file file where to print
-@param[in] array array to process */
-void
-AIO::to_file(FILE* file) const
-{
- acquire();
-
- fprintf(file, " " ULINTPF "\n", m_n_reserved);
-
- for (ulint i = 0; i < m_slots.size(); ++i) {
-
- const Slot& slot = m_slots[i];
-
- if (slot.is_reserved) {
-
- fprintf(file,
- "%s IO for %s (offset=" UINT64PF
- ", size=%lu)\n",
- slot.type.is_read() ? "read" : "write",
- slot.name, slot.offset, (unsigned long)(slot.len));
- }
- }
-
- release();
-}
-
-/** Print pending IOs for all arrays */
-void
-AIO::print_to_file(FILE* file)
-{
- fprintf(file, "Pending normal aio reads:");
-
- s_reads->to_file(file);
-
- if (s_writes != NULL) {
- fprintf(file, "Pending normal aio writes:");
- s_writes->to_file(file);
- }
-
- if (s_ibuf != NULL) {
- fprintf(file, "Pending ibuf aio reads:");
- s_ibuf->to_file(file);
- }
-
- if (s_log != NULL) {
- fprintf(file, "Pending log i/o's:");
- s_log->to_file(file);
- }
-
- if (s_sync != NULL) {
- fprintf(file, "Pending sync i/o's:");
- s_sync->to_file(file);
- }
-}
-
-/** Prints all pending IO
-@param[in] file File where to print */
-void
-os_aio_print_pending_io(
- FILE* file)
-{
- AIO::print_to_file(file);
-}
-
-#endif /* UNIV_DEBUG */
/**
Set the file create umask