diff options
author | Tamar Christina <tamar@zhox.com> | 2019-06-16 21:31:22 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:02 -0400 |
commit | 4489af6bad11a198e9e6c192f41e17020f28d0c1 (patch) | |
tree | a7046d2982400ef86d1e026947618c29b908cd62 | |
parent | 4bf542bf1cdf2fa468457fc0af21333478293476 (diff) | |
download | haskell-4489af6bad11a198e9e6c192f41e17020f28d0c1.tar.gz |
winio: core threaded I/O manager
-rw-r--r-- | libraries/base/GHC/Conc/IOCP.hs | 29 | ||||
-rw-r--r-- | libraries/base/GHC/Conc/POSIX.hs | 297 | ||||
-rw-r--r-- | libraries/base/GHC/Conc/POSIX/Const.hsc | 30 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Internal/Types.hs | 160 | ||||
-rw-r--r-- | libraries/base/GHC/Event/TimeOut.hs | 40 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Windows.hsc | 1188 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Windows/Clock.hs | 55 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Windows/ConsoleEvent.hsc | 72 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Windows/FFI.hsc | 395 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Windows/ManagedThreadPool.hs | 98 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Windows/Thread.hs | 43 | ||||
-rw-r--r-- | libraries/base/GHC/IO/Types.hs | 41 | ||||
-rw-r--r-- | libraries/base/cbits/consUtils.c | 4 | ||||
-rw-r--r-- | libraries/base/include/winio_structs.h | 40 |
14 files changed, 2490 insertions, 2 deletions
diff --git a/libraries/base/GHC/Conc/IOCP.hs b/libraries/base/GHC/Conc/IOCP.hs new file mode 100644 index 0000000000..299e4fed91 --- /dev/null +++ b/libraries/base/GHC/Conc/IOCP.hs @@ -0,0 +1,29 @@ +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-} +{-# OPTIONS_GHC -Wno-missing-signatures #-} +{-# OPTIONS_HADDOCK not-home #-} + +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Conc.IOCP +-- Copyright : (c) The University of Glasgow, 1994-2002 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Windows I/O Completion Port interface to the one defined in +-- GHC.Event.Windows. +-- +-- This module is an indirection to keep things in the same structure as before +-- but also to keep the new code where the actual I/O manager is. As such it +-- just re-exports GHC.Event.Windows.Thread +-- +----------------------------------------------------------------------------- + +-- #not-home +module GHC.Conc.IOCP + ( module GHC.Event.Windows.Thread ) where + +import GHC.Event.Windows.Thread diff --git a/libraries/base/GHC/Conc/POSIX.hs b/libraries/base/GHC/Conc/POSIX.hs new file mode 100644 index 0000000000..9a5243c2c0 --- /dev/null +++ b/libraries/base/GHC/Conc/POSIX.hs @@ -0,0 +1,297 @@ +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-} +{-# OPTIONS_GHC -Wno-missing-signatures #-} +{-# OPTIONS_HADDOCK not-home #-} + +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Conc.POSIX +-- Copyright : (c) The University of Glasgow, 1994-2002 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Windows I/O manager +-- TODO: Switch this to use new I/O manager. +-- +----------------------------------------------------------------------------- + +-- #not-home +module GHC.Conc.POSIX + ( ensureIOManagerIsRunning + , interruptIOManager + + -- * Waiting + , threadDelay + , registerDelay + + -- * Miscellaneous + , asyncRead + , asyncWrite + , asyncDoProc + + , asyncReadBA + , asyncWriteBA + + , module GHC.Event.Windows.ConsoleEvent + ) where + + +#include "windows_cconv.h" + +import Data.Bits (shiftR) +import GHC.Base +import GHC.Conc.Sync +import GHC.Conc.POSIX.Const +import GHC.Event.Windows.ConsoleEvent +import GHC.IO (unsafePerformIO) +import GHC.IORef +import GHC.MVar +import GHC.Num (Num(..)) +import GHC.Ptr +import GHC.Real (div, fromIntegral) +import GHC.Word (Word32, Word64) +import GHC.Windows +import Unsafe.Coerce ( unsafeCoerceUnlifted ) + +-- ---------------------------------------------------------------------------- +-- Thread waiting + +-- Note: threadWaitRead and threadWaitWrite aren't really functional +-- on Win32, but left in there because lib code (still) uses them (the manner +-- in which they're used doesn't cause problems on a Win32 platform though.) + +asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) +asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncRead# fd isSock len buf s of + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) + +asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int) +asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) = + IO $ \s -> case asyncWrite# fd isSock len buf s of + (# s', len#, err# #) -> (# s', (I# len#, I# err#) #) + +asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int +asyncDoProc (FunPtr proc) (Ptr param) = + -- the 'length' value is ignored; simplifies implementation of + -- the async*# primops to have them all return the same result. + IO $ \s -> case asyncDoProc# proc param s of + (# s', _len#, err# #) -> (# s', I# err# #) + +-- to aid the use of these primops by the IO Handle implementation, +-- provide the following convenience funs: + +-- this better be a pinned byte array! +asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) +asyncReadBA fd isSock len off bufB = + asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerceUnlifted bufB))) `plusPtr` off) + +asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int) +asyncWriteBA fd isSock len off bufB = + asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerceUnlifted bufB))) `plusPtr` off) + +-- ---------------------------------------------------------------------------- +-- Threaded RTS implementation of threadDelay + +-- | Suspends the current thread for a given number of microseconds +-- (GHC only). +-- +-- There is no guarantee that the thread will be rescheduled promptly +-- when the delay has expired, but the thread will never continue to +-- run /earlier/ than specified. +-- +threadDelay :: Int -> IO () +threadDelay time + | threaded = waitForDelayEvent time + | otherwise = IO $ \s -> + case time of { I# time# -> + case delay# time# s of { s' -> (# s', () #) + }} + +-- | Set the value of returned TVar to True after a given number of +-- microseconds. The caveats associated with threadDelay also apply. +-- +registerDelay :: Int -> IO (TVar Bool) +registerDelay usecs + | threaded = waitForDelayEventSTM usecs + | otherwise = errorWithoutStackTrace "registerDelay: requires -threaded" + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + +waitForDelayEvent :: Int -> IO () +waitForDelayEvent usecs = do + m <- newEmptyMVar + target <- calculateTarget usecs + _ <- atomicModifyIORef'_ pendingDelays (\xs -> Delay target m : xs) + prodServiceThread + takeMVar m + +-- Delays for use in STM +waitForDelayEventSTM :: Int -> IO (TVar Bool) +waitForDelayEventSTM usecs = do + t <- atomically $ newTVar False + target <- calculateTarget usecs + _ <- atomicModifyIORef'_ pendingDelays (\xs -> DelaySTM target t : xs) + prodServiceThread + return t + +calculateTarget :: Int -> IO USecs +calculateTarget usecs = do + now <- getMonotonicUSec + return $ now + (fromIntegral usecs) + +data DelayReq + = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ()) + | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool) + +{-# NOINLINE pendingDelays #-} +pendingDelays :: IORef [DelayReq] +pendingDelays = unsafePerformIO $ do + m <- newIORef [] + sharedCAF m getOrSetGHCConcWindowsPendingDelaysStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsPendingDelaysStore" + getOrSetGHCConcWindowsPendingDelaysStore :: Ptr a -> IO (Ptr a) + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore" + getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a) + +ensureIOManagerIsRunning :: IO () +ensureIOManagerIsRunning + | threaded = startIOManagerThread + | otherwise = return () + +interruptIOManager :: IO () +interruptIOManager = return () + +startIOManagerThread :: IO () +startIOManagerThread = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do t <- forkIO ioManager; return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> return (Just t) + +insertDelay :: DelayReq -> [DelayReq] -> [DelayReq] +insertDelay d [] = [d] +insertDelay d1 ds@(d2 : rest) + | delayTime d1 <= delayTime d2 = d1 : ds + | otherwise = d2 : insertDelay d1 rest + +delayTime :: DelayReq -> USecs +delayTime (Delay t _) = t +delayTime (DelaySTM t _) = t + +type USecs = Word64 +type NSecs = Word64 + +foreign import ccall unsafe "getMonotonicNSec" + getMonotonicNSec :: IO NSecs + +getMonotonicUSec :: IO USecs +getMonotonicUSec = fmap (`div` 1000) getMonotonicNSec + +{-# NOINLINE prodding #-} +prodding :: IORef Bool +prodding = unsafePerformIO $ do + r <- newIORef False + sharedCAF r getOrSetGHCConcWindowsProddingStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsProddingStore" + getOrSetGHCConcWindowsProddingStore :: Ptr a -> IO (Ptr a) + +prodServiceThread :: IO () +prodServiceThread = do + -- NB. use atomicSwapIORef here, otherwise there are race + -- conditions in which prodding is left at True but the server is + -- blocked in select(). + was_set <- atomicSwapIORef prodding True + when (not was_set) wakeupIOManager + +-- ---------------------------------------------------------------------------- +-- Windows IO manager thread + +ioManager :: IO () +ioManager = do + wakeup <- c_getIOManagerEvent + service_loop wakeup [] + +service_loop :: HANDLE -- read end of pipe + -> [DelayReq] -- current delay requests + -> IO () + +service_loop wakeup old_delays = do + -- pick up new delay requests + new_delays <- atomicSwapIORef pendingDelays [] + let delays = foldr insertDelay old_delays new_delays + + now <- getMonotonicUSec + (delays', timeout) <- getDelay now delays + + r <- c_WaitForSingleObject wakeup timeout + case r of + 0xffffffff -> do throwGetLastError "service_loop" + 0 -> do + r2 <- c_readIOManagerEvent + exit <- + case r2 of + _ | r2 == io_MANAGER_WAKEUP -> return False + _ | r2 == io_MANAGER_DIE -> return True + 0 -> return False -- spurious wakeup + _ -> do start_console_handler (r2 `shiftR` 1); return False + when (not exit) $ service_cont wakeup delays' + + _other -> service_cont wakeup delays' -- probably timeout + +service_cont :: HANDLE -> [DelayReq] -> IO () +service_cont wakeup delays = do + _ <- atomicSwapIORef prodding False + service_loop wakeup delays + +wakeupIOManager :: IO () +wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP + +-- Walk the queue of pending delays, waking up any that have passed +-- and return the smallest delay to wait for. The queue of pending +-- delays is kept ordered. +getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD) +getDelay _ [] = return ([], iNFINITE) +getDelay now all@(d : rest) + = case d of + Delay time m | now >= time -> do + putMVar m () + getDelay now rest + DelaySTM time t | now >= time -> do + atomically $ writeTVar t True + getDelay now rest + _otherwise -> + -- delay is in millisecs for WaitForSingleObject + let micro_seconds = delayTime d - now + milli_seconds = (micro_seconds + 999) `div` 1000 + in return (all, fromIntegral milli_seconds) + +foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_getIOManagerEvent :: IO HANDLE + +foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_readIOManagerEvent :: IO Word32 + +foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_sendIOManagerEvent :: Word32 -> IO () + +foreign import WINDOWS_CCONV "WaitForSingleObject" + c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD + diff --git a/libraries/base/GHC/Conc/POSIX/Const.hsc b/libraries/base/GHC/Conc/POSIX/Const.hsc new file mode 100644 index 0000000000..6978d4f5ec --- /dev/null +++ b/libraries/base/GHC/Conc/POSIX/Const.hsc @@ -0,0 +1,30 @@ +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE NoImplicitPrelude #-} +{-# OPTIONS_GHC -Wno-missing-signatures #-} +{-# OPTIONS_HADDOCK not-home #-} + +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Conc.POSIX.Const +-- Copyright : (c) The University of Glasgow, 2019 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Constants shared with the rts, GHC.Conc.POSIX uses MagicHash which confuses +-- hsc2hs so these are moved to a new module. +-- +----------------------------------------------------------------------------- + +-- #not-home +module GHC.Conc.POSIX.Const where + +import Data.Word + +#include <Rts.h> + +io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 +io_MANAGER_WAKEUP = #{const IO_MANAGER_WAKEUP} +io_MANAGER_DIE = #{const IO_MANAGER_DIE} diff --git a/libraries/base/GHC/Event/Internal/Types.hs b/libraries/base/GHC/Event/Internal/Types.hs new file mode 100644 index 0000000000..e02ff36b61 --- /dev/null +++ b/libraries/base/GHC/Event/Internal/Types.hs @@ -0,0 +1,160 @@ +{-# LANGUAGE NoImplicitPrelude #-} +------------------------------------------------------------------------------- +-- | +-- Module : GHC.IO.Types +-- Copyright : (c) Tamar Christina 2018 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable +-- +-- Abstraction over C Handle types for GHC, Unix wants FD (CInt) while Windows +-- Wants Handle (CIntPtr), so we abstract over them here. +-- +------------------------------------------------------------------------------- + +module GHC.Event.Internal.Types + ( + -- * Event type + Event + , evtRead + , evtWrite + , evtClose + , evtNothing + , eventIs + -- * Lifetimes + , Lifetime(..) + , EventLifetime + , eventLifetime + , elLifetime + , elEvent + -- * Timeout type + , Timeout(..) + ) where + +import Data.OldList (foldl', filter, intercalate, null) + +import Data.Bits ((.|.), (.&.)) +import Data.Semigroup.Internal (stimesMonoid) + +import GHC.Base +import GHC.Show (Show(..)) +import GHC.Word (Word64) + +-- | An I\/O event. +newtype Event = Event Int + deriving Eq -- ^ @since 4.4.0.0 + +evtNothing :: Event +evtNothing = Event 0 +{-# INLINE evtNothing #-} + +-- | Data is available to be read. +evtRead :: Event +evtRead = Event 1 +{-# INLINE evtRead #-} + +-- | The file descriptor is ready to accept a write. +evtWrite :: Event +evtWrite = Event 2 +{-# INLINE evtWrite #-} + +-- | Another thread closed the file descriptor. +evtClose :: Event +evtClose = Event 4 +{-# INLINE evtClose #-} + +eventIs :: Event -> Event -> Bool +eventIs (Event a) (Event b) = a .&. b /= 0 + +-- | @since 4.4.0.0 +instance Show Event where + show e = '[' : (intercalate "," . filter (not . null) $ + [evtRead `so` "evtRead", + evtWrite `so` "evtWrite", + evtClose `so` "evtClose"]) ++ "]" + where ev `so` disp | e `eventIs` ev = disp + | otherwise = "" + +-- | @since 4.10.0.0 +instance Semigroup Event where + (<>) = evtCombine + stimes = stimesMonoid + +-- | @since 4.4.0.0 +instance Monoid Event where + mempty = evtNothing + mconcat = evtConcat + +evtCombine :: Event -> Event -> Event +evtCombine (Event a) (Event b) = Event (a .|. b) +{-# INLINE evtCombine #-} + +evtConcat :: [Event] -> Event +evtConcat = foldl' evtCombine evtNothing +{-# INLINE evtConcat #-} + +-- | The lifetime of an event registration. +-- +-- @since 4.8.1.0 +data Lifetime = OneShot -- ^ the registration will be active for only one + -- event + | MultiShot -- ^ the registration will trigger multiple times + deriving ( Show -- ^ @since 4.8.1.0 + , Eq -- ^ @since 4.8.1.0 + ) + +-- | The longer of two lifetimes. +elSupremum :: Lifetime -> Lifetime -> Lifetime +elSupremum OneShot OneShot = OneShot +elSupremum _ _ = MultiShot +{-# INLINE elSupremum #-} + +-- | @since 4.10.0.0 +instance Semigroup Lifetime where + (<>) = elSupremum + stimes = stimesMonoid + +-- | @mappend@ takes the longer of two lifetimes. +-- +-- @since 4.8.0.0 +instance Monoid Lifetime where + mempty = OneShot + +-- | A pair of an event and lifetime +-- +-- Here we encode the event in the bottom three bits and the lifetime +-- in the fourth bit. +newtype EventLifetime = EL Int + deriving ( Show -- ^ @since 4.8.0.0 + , Eq -- ^ @since 4.8.0.0 + ) + +-- | @since 4.11.0.0 +instance Semigroup EventLifetime where + EL a <> EL b = EL (a .|. b) + +-- | @since 4.8.0.0 +instance Monoid EventLifetime where + mempty = EL 0 + +eventLifetime :: Event -> Lifetime -> EventLifetime +eventLifetime (Event e) l = EL (e .|. lifetimeBit l) + where + lifetimeBit OneShot = 0 + lifetimeBit MultiShot = 8 +{-# INLINE eventLifetime #-} + +elLifetime :: EventLifetime -> Lifetime +elLifetime (EL x) = if x .&. 8 == 0 then OneShot else MultiShot +{-# INLINE elLifetime #-} + +elEvent :: EventLifetime -> Event +elEvent (EL x) = Event (x .&. 0x7) +{-# INLINE elEvent #-} + +-- | A type alias for timeouts, specified in nanoseconds. +data Timeout = Timeout {-# UNPACK #-} !Word64 + | Forever + deriving Show -- ^ @since 4.4.0.0 diff --git a/libraries/base/GHC/Event/TimeOut.hs b/libraries/base/GHC/Event/TimeOut.hs new file mode 100644 index 0000000000..7be0a4ebc4 --- /dev/null +++ b/libraries/base/GHC/Event/TimeOut.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE NoImplicitPrelude #-} +------------------------------------------------------------------------------- +-- | +-- Module : GHC.Event.TimeOut +-- Copyright : (c) Tamar Christina 2018 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable +-- +-- Common Timer definitions shared between WinIO and RIO. +-- +------------------------------------------------------------------------------- + +module GHC.Event.TimeOut where + +import GHC.IO +import GHC.Base + +import qualified GHC.Event.PSQ as Q +import GHC.Event.Unique (Unique) + +-- | A priority search queue, with timeouts as priorities. +type TimeoutQueue = Q.PSQ TimeoutCallback + +-- | +-- Warning: since the 'TimeoutCallback' is called from the I/O manager, it must +-- not throw an exception or block for a long period of time. In particular, +-- be wary of 'Control.Exception.throwTo' and 'Control.Concurrent.killThread': +-- if the target thread is making a foreign call, these functions will block +-- until the call completes. +type TimeoutCallback = IO () + +-- | An edit to apply to a 'TimeoutQueue'. +type TimeoutEdit = TimeoutQueue -> TimeoutQueue + +-- | A timeout registration cookie. +newtype TimeoutKey = TK Unique + deriving (Eq, Ord) diff --git a/libraries/base/GHC/Event/Windows.hsc b/libraries/base/GHC/Event/Windows.hsc new file mode 100644 index 0000000000..4688075daf --- /dev/null +++ b/libraries/base/GHC/Event/Windows.hsc @@ -0,0 +1,1188 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DoAndIfThenElse #-} +{-# LANGUAGE ForeignFunctionInterface #-} +{-# LANGUAGE PatternGuards #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE CPP #-} + +------------------------------------------------------------------------------- +-- | +-- Module : GHC.Event.Windows +-- Copyright : (c) Tamar Christina 2018 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable +-- +-- WinIO Windows event manager. +-- +------------------------------------------------------------------------------- + +module GHC.Event.Windows ( + -- * Manager + Manager, + getSystemManager, + interruptSystemManager, + wakeupIOManager, + processRemoteCompletion, + + -- * Overlapped I/O + associateHandle, + associateHandle', + withOverlapped, + withOverlappedEx, + StartCallback, + StartIOCallback, + CbResult(..), + CompletionCallback, + LPOVERLAPPED, + + -- * Timeouts + TimeoutCallback, + TimeoutKey, + Seconds, + registerTimeout, + updateTimeout, + unregisterTimeout, + + -- * Utilities + withException, + ioSuccess, + ioFailed, + getLastError, + + -- * I/O Result type + IOResult(..), + + -- * I/O Event notifications + HandleData (..), -- seal for release + HandleKey (handleValue), + registerHandle, + unregisterHandle, + + -- * Console events + module GHC.Event.Windows.ConsoleEvent +) where + +-- define DEBUG 1 + +##include "windows_cconv.h" +#include <windows.h> +#include <ntstatus.h> +#include <Rts.h> +#include "winio_structs.h" + +import GHC.Event.Windows.Clock (Clock, Seconds, getClock, getTime) +import GHC.Event.Windows.FFI (LPOVERLAPPED, OVERLAPPED_ENTRY(..)) +import GHC.Event.Windows.ManagedThreadPool +import GHC.Event.Internal.Types +import qualified GHC.Event.Windows.FFI as FFI +import qualified GHC.Event.PSQ as Q +import qualified GHC.Event.IntTable as IT +import qualified GHC.Event.Internal as I + +import {-# SOURCE #-} Control.Concurrent +import Control.Concurrent.MVar +import Control.Exception as E +import Data.IORef +import Data.Foldable (mapM_, length, forM_) +import Data.Maybe +import Data.Word +import Data.Semigroup.Internal (stimesMonoid) +import Data.OldList (deleteBy) +import Foreign +import Foreign.ForeignPtr.Unsafe +import qualified GHC.Event.Array as A +import GHC.Base +import GHC.Conc.Sync (forkIO, showThreadId, + ThreadId(..), ThreadStatus(..), + threadStatus, sharedCAF) +import GHC.Event.Unique +import GHC.Event.TimeOut +import GHC.Event.Windows.ConsoleEvent +import GHC.IOPort +import GHC.Num +import GHC.Real +import GHC.Windows +import GHC.List (null) +import GHC.Ptr +import System.IO.Unsafe (unsafePerformIO) +import Text.Show +import GHC.RTS.Flags + +-- if defined(DEBUG) +#if 1 +import Foreign.C +import System.Posix.Internals (c_write) +import GHC.Conc.Sync (myThreadId) +#endif + +import qualified GHC.Windows as Win32 + +-- Note [WINIO Manager design] +-- This file contains the Windows I//O manager. Windows's IO subsystem is by +-- design fully asynchronous, however there are multiple ways and interfaces +-- to the async methods. +-- +-- The chosen Async interface for this implementation is using Completion Ports +-- See also Note [Completion Ports]. The I/O manager uses a new interface added +-- in Windows Vista called `GetQueuedCompletionStatusEx` which allows us to +-- service multiple requests in one go. +-- +-- See https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/overview-of-the-windows-i-o-model +-- and https://www.microsoftpressstore.com/articles/article.aspx?p=2201309&seqNum=3 +-- +-- In order to understand this file, here is what you should know: +-- We're using relatively new APIs that allow us to service multiple requests at +-- the same time using one OS thread. This happens using so called Completion +-- ports. All I/O actions get associated with one and the same completion port. +-- +-- The I/O manager itself has two mode of operation: +-- 1) Threaded: We have N dedicated OS threads in the Haskell world that service +-- completion requests. Everything is Handled 100% in view of the runtime. +-- Whenever the OS has completions that need to be serviced it wakes up one +-- one of the OS threads that are blocked in GetQueuedCompletionStatusEx and +-- lets it proceed with the list of completions that are finished. If more +-- completions finish before the first list is done being processed then +-- another thread is woken up. These threads are associated with the I/O +-- manager through the completion port. If it blocks for any reason the +-- I/O manager will wake up another thread from the pool to finish processing +-- the remaining entries. This worker threads must be able to handle the +-- fact that something else has finished the remainder of their queue or must +-- have a guarantee to never block. In this implementation we strive to +-- never block. This is achieved by not having the worker threads call out +-- to any user code, and to have the IOPort synchronization primitive never +-- block. This means if the port is full the message is lost, however we +-- have an invariant that the port can never be full and have a waiting +-- receiver. As such, dropping the message does not change anything as there +-- will never be anyone to receive it. e.g. it is an impossible situation to +-- land in. +-- 2) Non-threaded: We don't have any dedicated Haskell threads at servicing +-- I/O Requests. Instead we have an OS thread inside the RTS that gets +-- notified of new requests and does the servicing. When a request completes +-- a Haskell thread is scheduled to run to finish off the processing of any +-- completed requests. See Note [Non-Threaded WINIO design]. +-- +-- These two modes of operations share the majority of the code and so they both +-- support the same operations and fixing one will fix the other. (See the step +-- function.) +-- Unlike MIO, we don't threat network I/O any differently than file I/O. Hence +-- any network specific code is now only in the network package. +-- +-- Note [Threaded WINIO design] +-- The threaded WiNIO is designed around a simple blocking call that's called in +-- a service loop in a dedicated thread: `GetQueuedCompletionStatusEx`. +-- as such the loop is reasonably simple. We're either servicing finished +-- requests or blocking in `getQueuedCompletionStatusEx` waiting for new +-- requests to arrive. +-- +-- Each time a Handle is made three important things happen that affect the I/O +-- manager design: +-- 1) Files are opened with the `FILE_FLAG_OVERLAPPED` flag, which instructs the +-- OS that we will be doing purely asynchronous requests. See +-- `GHC.IO.Windows.Handle.openFile`. They are also opened with +-- `FILE_FLAG_SEQUENTIAL_SCAN` to indicate to the OS that we want to optimize +-- the access of the file for sequential access. (e.g. equivalent to MADVISE) +-- 2) The created handle is associated with the I/O manager's completion port. +-- This allows the I/O manager to be able to service I/O events from this +-- handle. See `associateHandle`. +-- 3) File handles are additionally modified with two optimization flags: +-- +-- FILE_SKIP_COMPLETION_PORT_ON_SUCCESS: If the request can be serviced +-- immediately, then do not queue the IRP (IO Request Packet) into the I/O +-- manager waiting for us to service it later. Instead service it +-- immediately in the same call. This is beneficial for two reasons: +-- 1) We don't have to block in the Haskell RTS. +-- 2) We save a bunch of work in the OS's I/O subsystem. +-- The downside is though that we have to do a bunch of work to handle these +-- cases. This is abstracted away from the user by the `withOverlapped` +-- function. +-- This together with the buffering strategy mentioned above means we +-- actually skip the I/O manager on quite a lot of I/O requests due to the +-- value being in the cache. Because of the Lazy I/O in Haskell, the time +-- to read and decode the buffer of bytes is usually longer than the OS needs +-- to read the next chunk, so we hit the FAST_IO IRP quite often. +-- +-- FILE_SKIP_SET_EVENT_ON_HANDLE: Since we will not be using an event object +-- to monitor asynchronous completions, don't bother updating or checking for +-- one. This saves some precious cycles, especially on operations with very +-- high number of I/O operations (e.g. servers.) +-- +-- So what does servicing a request actually mean. As mentioned before the +-- I/O manager will be blocked or servicing a request. In reality it doesn't +-- always block till an I/O request has completed. In cases where we have event +-- timers, we block till the next timer's timeout. This allows us to also +-- service timers in the same loop. The side effect of this is that we will +-- exit the I/O wait sometimes without any completions. Not really a problem +-- but it's an important design decision. +-- +-- Every time we wait, we give a pre-allocated buffer of `n` +-- `OVERLAPPED_ENTRIES` to the OS. This means that in a single call we can +-- service up to `n` I/O requests at a time. The size of `n` is not fixed, +-- anytime we dequeue `n` I/O requests in a single operation we double the +-- buffer size, allowing the I/O manager to be able to scale up depending +-- on the workload. This buffer is kept alive throughout the lifetime of the +-- program and is never freed until the I/O manager is shutting down. +-- +-- One very important property of the I/O subsystem is that each I/O request +-- now requires an `OVERLAPPED` structure be given to the I/O manager. See +-- `withOverlappedEx`. This buffer is used by the OS to fill in various state +-- information by the OS. Throughout the duration of I/O call, this buffer MUST +-- remain live. The address is pinned by the kernel, which means that the +-- pointer must remain accessible until `GetQueuedCompletionStatusEx` returns +-- the completion associated with the handle and not just until the call to what +-- ever I/O operation was used to initialize the I/O request returns. +-- The only exception to this is when the request has hit the FAST_IO path, in +-- which case it has skipped the I/O queue and so can be freed immediately after +-- reading the results from it. +-- +-- To prevent having to lookup the Haskell payload in a shared state after the +-- request completes we attach it as part of the I/O request by extending the +-- `OVERLAPPED` structure. Instead of passing an `OVERLAPPED` structure to the +-- Windows API calls we instead pass a `HASKELL_OVERLAPPED` struct which has +-- as the first element an `OVERLAPPED structure. This means when a request is +-- done all we need to do is cast the pointer back to `HASKELL_OVERLAPPED` and +-- read the accompanying data. This also means we don't have a global lock and +-- so can scale much easier. + +-- --------------------------------------------------------------------------- +-- I/O manager resume/suspend code + +{-# NOINLINE ioManagerThread #-} +ioManagerThread :: MVar (Maybe ThreadId) +ioManagerThread = unsafePerformIO $ do + m <- newMVar Nothing + sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore + +foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore" + getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a) + +-- --------------------------------------------------------------------------- +-- Non-threaded I/O manager callback hooks. See `ASyncWinIO.c` + +foreign import ccall safe "registerNewIOCPHandle" + registerNewIOCPHandle :: FFI.IOCP -> IO () + +foreign import ccall safe "registerAlertableWait" + registerAlertableWait :: FFI.IOCP -> DWORD -> Word64 -> IO () + +foreign import ccall safe "getOverlappedEntries" + getOverlappedEntries :: Ptr DWORD -> IO (Ptr OVERLAPPED_ENTRY) + +foreign import ccall safe "servicedIOEntries" + servicedIOEntries :: Word64 -> IO () + +foreign import ccall safe "completeSynchronousRequest" + completeSynchronousRequest :: IO () + +------------------------------------------------------------------------ +-- Manager structures + +-- | Callback type that will be called when an I/O operation completes. +type IOCallback = CompletionCallback () + +-- | Wrap the IOCallback type into a FunPtr. +foreign import ccall "wrapper" + wrapIOCallback :: IOCallback -> IO (FunPtr IOCallback) + +-- | Unwrap a FunPtr IOCallback to a normal Haskell function. +foreign import ccall "dynamic" + mkIOCallback :: FunPtr IOCallback -> IOCallback + +-- | Structure that the I/O managed uses to to associate callbacks with +-- it's additional payload such as it's OVERLAPPED structure and Win32 handle +-- etc. Must be kept in sync with that in `winio_structs.h` or horrible things +-- happen. +data CompletionData = CompletionData { cdHandle :: !HANDLE + , cdCallback :: !IOCallback + } + +instance Storable CompletionData where + sizeOf _ = #{size CompletionData} + alignment _ = #{alignment CompletionData} + + peek ptr = do + cdHandle <- #{peek CompletionData, cdHandle} ptr + cdCallback <- mkIOCallback `fmap` #{peek CompletionData, cdCallback} ptr + let !cd = CompletionData{..} + return cd + + poke ptr CompletionData{..} = do + #{poke CompletionData, cdHandle} ptr cdHandle + cb <- wrapIOCallback cdCallback + #{poke CompletionData, cdCallback} ptr cb + +-- | Pointer offset in bytes to the location of hoData in HASKELL_OVERLAPPPED +cdOffset :: Int +cdOffset = #{const __builtin_offsetof (HASKELL_OVERLAPPED, hoData)} + +-- | Terminator symbol for IOCP request +nullReq :: Ptr (Ptr a) +nullReq = castPtr $ unsafePerformIO $ new $ (nullPtr :: Ptr ()) + +-- I don't expect a lot of events, so a simple linked lists should be enough. +type EventElements = [(Event, HandleData)] +data EventData = EventData { evtTopLevel :: !Event, evtElems :: !EventElements } + +instance Monoid EventData where + mempty = EventData evtNothing [] + mappend = (<>) + +instance Semigroup EventData where + (<>) = \a b -> EventData (evtTopLevel a <> evtTopLevel b) + (evtElems a ++ evtElems b) + stimes = stimesMonoid + +data IOResult a + = IOSuccess { ioValue :: a } + | IOFailed { ioErrCode :: Maybe Int } + +-- | The state object for the I/O manager. This structure is available for both +-- the threaded and the non-threaded RTS. +data Manager = Manager + { mgrIOCP :: {-# UNPACK #-} !FFI.IOCP + , mgrClock :: !Clock + , mgrUniqueSource :: {-# UNPACK #-} !UniqueSource + , mgrTimeouts :: {-# UNPACK #-} !(IORef TimeoutQueue) + , mgrEvntHandlers :: {-# UNPACK #-} + !(MVar (IT.IntTable EventData)) + , mgrOverlappedEntries + :: {-#UNPACK #-} !(A.Array OVERLAPPED_ENTRY) + , mgrThreadPool :: Maybe ThreadPool + } + +-- | Create a new I/O manager. In the Threaded I/O manager this call doesn't +-- have any side effects, but in the Non-Threaded I/O manager the newly +-- created IOCP handle will be registered with the RTS. Users should never +-- call this. +-- +-- NOTE: This needs to finish without making any calls to anything requiring the +-- I/O manager otherwise we'll get into some weird synchronization issues. +-- Essentially this means avoid using long running operations here. +newManager :: IO Manager +newManager = do + debugIO "Starting io-manager..." + mgrIOCP <- FFI.newIOCP + when (not threaded) $ + registerNewIOCPHandle mgrIOCP + debugIO $ "iocp: " ++ show mgrIOCP + mgrClock <- getClock + mgrUniqueSource <- newSource + mgrTimeouts <- newIORef Q.empty + mgrOverlappedEntries <- A.new 64 + mgrEvntHandlers <- newMVar =<< IT.new callbackArraySize + let mgrThreadPool = Nothing + + let !mgr = Manager{..} + return mgr + +{-# INLINE startIOManagerThread #-} +-- | Starts a new I/O manager thread. +-- For the threaded runtime it creates a pool of OS threads which stays alive +-- until they are instructed to die. For the non-threaded runtime we have a +-- single worker thread in the C runtime. +startIOManagerThread :: IO () -> IO () +startIOManagerThread loop = do + modifyMVar_ ioManagerThread $ \old -> do + let create = do debugIO "spawning worker threads.." + t <- if threaded + then forkOS loop + else forkIO loop + setStatus WinIORunning + debugIO $ "created io-manager threads." + return (Just t) + case old of + Nothing -> create + Just t -> do + s <- threadStatus t + case s of + ThreadFinished -> create + ThreadDied -> create + _other -> do status <- getStatus + case status of + WinIOBlocked -> do + c_sendIOManagerEvent io_MANAGER_WAKEUP + debugIO $ "woke up manager on thread: " + ++ showThreadId t + WinIOScanning -> do + debugIO $ "interrupted IOCP timeout wait on thread: " + ++ showThreadId t + WinIOWaiting -> do + debugIO $ "interrupted IOCP long wait on thread: " + ++ showThreadId t + _ -> return () + when (status /= WinIORunning) + interruptSystemManager + return (Just t) + +-- | The various states the I/O manager can be in. Used mostly for internal +-- bookkeeping and to make certain operations idempotent. +data WinIOStatus + = WinIORunning -- ^ I/O manager is running and doing something. + | WinIOScanning -- ^ The I/O manager has been interrupted without servicing + -- a request. Likely due to a timer elapsing. + | WinIOWaiting -- ^ I/O manager is blocked on an alert-able wait for I/O + -- completions. + | WinIOBlocked -- ^ I/O manager is not servicing any I/O requests but the + -- thread is still alive. This is usually the result of + -- a user requested event. + | WinIODone -- The I/O manager was requested to terminate and has done so. + deriving Eq + + +statusWinIO :: MVar WinIOStatus +statusWinIO = unsafePerformIO $ newMVar WinIODone + +setStatus :: WinIOStatus -> IO () +setStatus val = modifyMVar_ statusWinIO (\_ -> return val) + +getStatus :: IO WinIOStatus +getStatus = readMVar statusWinIO + +requests :: MVar Word64 +requests = unsafePerformIO $ newMVar 0 + +addRequest :: IO Word64 +addRequest = modifyMVar requests (\x -> return (x + 1, x + 1)) + +removeRequest :: IO Word64 +removeRequest = modifyMVar requests (\x -> return (x - 1, x - 1)) + +outstandingRequests :: IO Word64 +outstandingRequests = withMVar requests return + +getSystemManager :: IO Manager +getSystemManager = readMVar managerRef + +-- | Mutable reference to the IO manager +managerRef :: MVar Manager +managerRef = unsafePerformIO $ newManager >>= newMVar +{-# NOINLINE managerRef #-} + +-- | Interrupts an I/O manager Wait. This will force the I/O manager to process +-- any outstanding events and timers. Also called when console events such as +-- ctrl+c are used to break abort an I/O request. +interruptSystemManager :: IO () +interruptSystemManager = do + mgr <- getSystemManager + status <- getStatus + when (status /= WinIORunning) $ + do debugIO "interrupt received.." + FFI.postQueuedCompletionStatus (mgrIOCP mgr) 0 0 nullPtr + when (status == WinIODone) $ + do debugIO $ "I/O manager is dead. You need to revive it first. " + ++ "Try wakeupIOManager instead." + + +-- | The initial number of I/O requests we can service at the same time. +-- Must be power of 2. This number is used as the starting point to scale +-- the number of concurrent requests. It will be doubled everytime we are +-- saturated. +callbackArraySize :: Int +callbackArraySize = 32 + +----------------------------------------------------------------------- +-- Time utilities + +secondsToNanoSeconds :: Seconds -> Q.Prio +secondsToNanoSeconds s = ceiling $ s * 1000000000 + +nanoSecondsToSeconds :: Q.Prio -> Seconds +nanoSecondsToSeconds n = fromIntegral n / 1000000000.0 + +------------------------------------------------------------------------ +-- Overlapped I/O + +-- | Callback that starts the overlapped I/O operation. +-- It must return successfully if and only if an I/O completion has been +-- queued. Otherwise, it must throw an exception, which 'withOverlapped' +-- will rethrow. +type StartCallback a = LPOVERLAPPED -> IO a + +-- | Specialized callback type for I/O Completion Ports calls using +-- withOverlapped. +type StartIOCallback a = StartCallback (CbResult a) + +-- | CallBack result type to disambiguate between the different states +-- an I/O Completion call could be in. +data CbResult a + = CbDone (Maybe DWORD) -- ^ Request was handled immediately, no queue. + | CbPending -- ^ Queued and handled by I/O manager + | CbIncomplete -- ^ I/O request is incomplete but not enqueued, handle + -- it synchronously. + | CbError a -- ^ I/O request abort, return failure immediately + | CbNone Bool -- ^ The caller did not do any checking, the I/O + -- manager will perform additional checks. + +-- | Called when the completion is delivered. +type CompletionCallback a = ErrCode -- ^ 0 indicates success + -> DWORD -- ^ Number of bytes transferred + -> IO a + +-- | Associate a 'HANDLE' with the current I/O manager's completion port. +-- This must be done before using the handle with 'withOverlapped'. +associateHandle' :: HANDLE -> IO () +associateHandle' hwnd + = do mngr <- getSystemManager + associateHandle mngr hwnd + +-- | Associate a 'HANDLE' with the I/O manager's completion port. This must be +-- done before using the handle with 'withOverlapped'. +associateHandle :: Manager -> HANDLE -> IO () +associateHandle Manager{..} h = + -- Use as completion key the file handle itself, so we can track completion + FFI.associateHandleWithIOCP mgrIOCP h (fromIntegral $ ptrToWordPtr h) + +-- | Start an overlapped I/O operation, and wait for its completion. If +-- 'withOverlapped' is interrupted by an asynchronous exception, the operation +-- will be canceled using @CancelIoEx@. +-- +-- 'withOverlapped' waits for a completion to arrive before returning or +-- throwing an exception. This means you can use functions like +-- 'Foreign.Marshal.Alloc.alloca' to allocate buffers for the operation. +withOverlappedEx :: Manager + -> String + -> HANDLE + -> Word64 -- ^ Value to use for the @OVERLAPPED@ + -- structure's Offset/OffsetHigh members. + -> StartIOCallback Int + -> CompletionCallback (IOResult a) + -> IO (IOResult a) +withOverlappedEx mgr fname h offset startCB completionCB = do + signal <- newEmptyIOPort :: IO (IOPort (IOResult a)) + let dbg s = s ++ " (" ++ show h ++ ":" ++ show offset ++ ")" + let signalReturn a = failIfFalse_ (dbg "signalReturn") $ + writeIOPort signal (IOSuccess a) + signalThrow ex = failIfFalse_ (dbg "signalThrow") $ + writeIOPort signal (IOFailed ex) + mask_ $ do + let completionCB' e b = completionCB e b >>= \result -> + case result of + IOSuccess val -> signalReturn val + IOFailed err -> signalThrow err + hs_lpol <- FFI.allocOverlapped offset + -- Create the completion record and store it. + -- We only need the record when we enqueue a request, however if we + -- delay creating it then we will run into a race condition where the + -- driver may have finished servicing the request before we were ready + -- and so the request won't have the book keeping information to know + -- what to do. So because of that we always create the payload, If we + -- need it ok, if we don't that's no problem. This approach prevents + -- expensive lookups in hash-tables. + -- + -- Todo: Use a memory pool for this so we don't have to hit malloc every + -- time. This would allow us to scale better. + cdData <- new (CompletionData h completionCB') + let ptr_lpol = hs_lpol `plusPtr` cdOffset + poke ptr_lpol cdData + let lpol = castPtr hs_lpol + debugIO $ "hs_lpol:" ++ show hs_lpol + ++ " cdData:" ++ show cdData + ++ " ptr_lpol:" ++ show ptr_lpol + + execute <- startCB lpol `onException` + (CbError `fmap` Win32.getLastError) >>= \result -> do + -- Check to see if the operation was completed on a + -- non-overlapping handle or was completed immediately. + -- e.g. stdio redirection or data in cache, FAST I/O. + success <- FFI.overlappedIOStatus lpol + err <- fmap fromIntegral getLastError + -- Determine if the caller has done any checking. If not then check + -- to see if the request was completed synchronously. We have to + -- in order to prevent deadlocks since if it has completed + -- synchronously we've requested to not have the completion queued. + let result' = + case result of + CbNone ret | success == #{const STATUS_SUCCESS} -> CbDone Nothing + | success == #{const STATUS_END_OF_FILE} -> CbDone Nothing + | success == #{const STATUS_PENDING} -> CbPending + -- Buffer was too small.. not sure what to do, so I'll just + -- complete the read request + | err == #{const ERROR_MORE_DATA} -> CbDone Nothing + | err == #{const ERROR_SUCCESS} -> CbDone Nothing + | err == #{const ERROR_IO_PENDING} -> CbPending + | err == #{const ERROR_IO_INCOMPLETE} -> CbIncomplete + | err == #{const ERROR_HANDLE_EOF} -> CbDone Nothing + | not ret -> CbError err + | otherwise -> CbPending + _ -> result + case result' of + CbNone _ -> error "shouldn't happen." + CbIncomplete -> do + debugIO $ "handling incomplete request synchronously " ++ show (h, lpol) + res <- spinWaitComplete h lpol + debugIO $ "done blocking request " ++ show (h, lpol) + return res + CbPending -> do + -- Before we enqueue check to see if operation finished in the + -- mean time, since caller may not have done this. + -- Normally we'd have to clear lpol with 0 before this call, + -- however the statuses we're interested in would not get to here + -- so we can save the memset call. + finished <- FFI.getOverlappedResult h lpol False + debugIO $ "== " ++ show (finished) + status <- FFI.overlappedIOStatus lpol + debugIO $ "== >< " ++ show (status) + lasterr <- fmap fromIntegral getLastError :: IO Int + -- This status indicated that we have finished early and so we + -- won't have a request enqueued. Handle it inline. + let done_early = status == #{const STATUS_SUCCESS} + || status == #{const STATUS_END_OF_FILE} + || lasterr == #{const ERROR_HANDLE_EOF} + || lasterr == #{const ERROR_SUCCESS} + -- This status indicates that the request hasn't finished early, + -- but it will finish shortly. The I/O manager will not be + -- enqueuing this either. Also needs to be handled inline. + let will_finish_sync = lasterr == #{const ERROR_IO_INCOMPLETE} + + debugIO $ "== >*< " ++ show (finished, done_early, will_finish_sync, h, lpol, lasterr) + case (finished, done_early, will_finish_sync) of + (Nothing, False, False) -> do + reqs <- addRequest + debugIO $ "+1.. " ++ show reqs ++ " requests queued. | " ++ show lpol + wakeupIOManager + return result' + (Nothing, False, True) -> do + debugIO $ "handling incomplete request synchronously " ++ show (h, lpol) + res <- spinWaitComplete h lpol + debugIO $ "done blocking request " ++ show (h, lpol) + return res + _ -> do + debugIO "request handled immediately (o/b), not queued." + return $ CbDone finished + CbError err' -> signalThrow (Just err') >> return result' + CbDone _ -> do + debugIO "request handled immediately (o), not queued." >> return result' + + let cancel e = do + debugIO $ "## Exception occurred. Cancelling request... " + debugIO $ show (e :: SomeException) + _ <- uninterruptibleMask_ $ FFI.cancelIoEx' h lpol + -- we need to wait for the cancellation before removing + -- the pointer. + debugIO $ "## Waiting for cancellation record... " + _ <- FFI.getOverlappedResult h lpol True + let oldDataPtr = exchangePtr ptr_lpol nullReq + -- Check if we have to free and cleanup pointer + when (oldDataPtr == cdData) $ + do free oldDataPtr + free hs_lpol + reqs <- removeRequest + debugIO $ "-1.. " ++ show reqs ++ " requests queued after error." + status <- fmap fromIntegral getLastError + completionCB' status 0 + when (not threaded) $ + do num_remaining <- outstandingRequests + servicedIOEntries num_remaining + return $ IOFailed Nothing + let runner = do debugIO $ (dbg ":: waiting ") ++ " | " ++ show lpol + res <- readIOPort signal `catch` cancel + debugIO $ dbg ":: signaled " + case res of + IOFailed err -> FFI.throwWinErr fname (maybe 0 fromIntegral err) + _ -> return res + + -- Sometimes we shouldn't bother with the I/O manager as the call has + -- failed or is done. + case execute of + CbPending -> runner + CbDone rdata -> do + -- free cdData + debugIO $ dbg $ ":: done " ++ show lpol ++ " - " ++ show rdata + bytes <- if isJust rdata + then return rdata + -- Make sure it's safe to free the OVERLAPPED buffer + else FFI.getOverlappedResult h lpol False + case bytes of + Just res -> completionCB 0 res -- free hs_lpol >> completionCB 0 res + Nothing -> do err <- FFI.overlappedIOStatus lpol + numBytes <- FFI.overlappedIONumBytes lpol + -- TODO: Remap between STATUS_ and ERROR_ instead + -- of re-interpret here. But for now, don't care. + let err' = fromIntegral err + -- free hs_lpol + completionCB err' (fromIntegral numBytes) + CbError err -> do + free cdData + free hs_lpol + let err' = fromIntegral err + completionCB err' 0 + _ -> do + free cdData + free hs_lpol + error "unexpected case in `execute'" + where spinWaitComplete fhndl lpol = do + -- Wait for the request to finish as it was running before and + -- The I/O manager won't enqueue it due to our optimizations to + -- prevent context switches in such cases. + res <- FFI.getOverlappedResult fhndl lpol False + status <- FFI.overlappedIOStatus lpol + case res of + -- Uses an inline definition of threadDelay to prevent an import + -- cycle. + Nothing | status == #{const STATUS_END_OF_FILE} -> do + when (not threaded) completeSynchronousRequest + return $ CbDone res + | otherwise -> + do m <- newEmptyIOPort + let secs = 100 / 1000000.0 + reg <- registerTimeout mgr secs $ + writeIOPort m () >> return () + readIOPort m `onException` unregisterTimeout mgr reg + spinWaitComplete fhndl lpol + _ -> do + when (not threaded) completeSynchronousRequest + return $ CbDone res + +-- Safe version of function +withOverlapped :: String + -> HANDLE + -> Word64 -- ^ Value to use for the @OVERLAPPED@ + -- structure's Offset/OffsetHigh members. + -> StartIOCallback Int + -> CompletionCallback (IOResult a) + -> IO (IOResult a) +withOverlapped fname h offset startCB completionCB = do + mngr <- getSystemManager + withOverlappedEx mngr fname h offset startCB completionCB + +------------------------------------------------------------------------ +-- I/O Utilities + +-- | Process an IOResult and throw an exception back to the user if the action +-- has failed, or return the result. +withException :: String -> IO (IOResult a) -> IO a +withException name fn + = do res <- fn + case res of + IOSuccess a -> return a + IOFailed (Just err) -> FFI.throwWinErr name $ fromIntegral err + IOFailed Nothing -> FFI.throwWinErr name 0 + +-- | Signal that the I/O action was successful. +ioSuccess :: a -> IO (IOResult a) +ioSuccess = return . IOSuccess + +-- | Signal that the I/O action has failed with the given reason. +ioFailed :: Integral a => a -> IO (IOResult a) +ioFailed = return . IOFailed . Just . fromIntegral + +------------------------------------------------------------------------ +-- Timeouts + +-- | Register an action to be performed in the given number of seconds. The +-- returned 'TimeoutKey' can be used to later un-register or update the timeout. +-- The timeout is automatically unregistered when it fires. +-- +-- The 'TimeoutCallback' will not be called more than once. +registerTimeout :: Manager -> Seconds -> TimeoutCallback -> IO TimeoutKey +registerTimeout mgr@Manager{..} relTime cb = do + key <- newUnique mgrUniqueSource + if relTime <= 0 then cb + else do + now <- getTime mgrClock + let !expTime = secondsToNanoSeconds $ now + relTime + editTimeouts mgr (Q.unsafeInsertNew key expTime cb) + return $ TK key + +-- | Update an active timeout to fire in the given number of seconds (from the +-- time 'updateTimeout' is called), instead of when it was going to fire. +-- This has no effect if the timeout has already fired. +updateTimeout :: Manager -> TimeoutKey -> Seconds -> IO () +updateTimeout mgr (TK key) relTime = do + now <- getTime (mgrClock mgr) + let !expTime = secondsToNanoSeconds $ now + relTime + editTimeouts mgr (Q.adjust (const expTime) key) + +-- | Unregister an active timeout. This is a harmless no-op if the timeout is +-- already unregistered or has already fired. +-- +-- Warning: the timeout callback may fire even after +-- 'unregisterTimeout' completes. +unregisterTimeout :: Manager -> TimeoutKey -> IO () +unregisterTimeout mgr (TK key) = do + editTimeouts mgr (Q.delete key) + +-- | Modify an existing timeout. This isn't thread safe and so if the time to +-- elapse the timer was close it may fire anyway. +editTimeouts :: Manager -> TimeoutEdit -> IO () +editTimeouts mgr g = do + atomicModifyIORef' (mgrTimeouts mgr) $ \tq -> (g tq, ()) + wakeupIOManager + +------------------------------------------------------------------------ +-- I/O manager loop + +-- | Call all expired timeouts, and return how much time until the next +-- | expiration. +runExpiredTimeouts :: Manager -> IO (Maybe Seconds) +runExpiredTimeouts Manager{..} = do + now <- getTime mgrClock + (expired, delay) <- atomicModifyIORef' mgrTimeouts (mkTimeout now) + -- Execute timeout callbacks. + mapM_ Q.value expired + when (not threaded && not (null expired)) + completeSynchronousRequest + debugIO $ "expired calls: " ++ show (length expired) + return delay + where + mkTimeout :: Seconds -> TimeoutQueue -> + (TimeoutQueue, ([Q.Elem TimeoutCallback], Maybe Seconds)) + mkTimeout now tq = + let (tq', (expired, sec)) = mkTimeout' (secondsToNanoSeconds now) tq + in (tq', (expired, fmap nanoSecondsToSeconds sec)) + mkTimeout' :: Q.Prio -> TimeoutQueue -> + (TimeoutQueue, ([Q.Elem TimeoutCallback], Maybe Q.Prio)) + mkTimeout' now tq = + -- Remove timeouts with expiration <= now. + let (expired, tq') = Q.atMost now tq in + -- See how soon the next timeout expires. + case Q.prio `fmap` Q.findMin tq' of + Nothing -> + (tq', (expired, Nothing)) + Just t -> + -- This value will always be positive since the call + -- to 'atMost' above removed any timeouts <= 'now' + let !t' = t - now + in (tq', (expired, Just t')) + +-- | Return the delay argument to pass to GetQueuedCompletionStatus. +fromTimeout :: Maybe Seconds -> Word32 +fromTimeout Nothing = 120000 +fromTimeout (Just sec) | sec > 120 = 120000 + | sec > 0 = ceiling (sec * 1000) + | otherwise = 0 + +-- | Perform one full evaluation step of the I/O manager's service loop. +-- This means process timeouts and completed completions and calculate the time +-- for the next timeout. +-- +-- The I/O manager is then notified of how long it should block again based on +-- the queued I/O requests and timers. If the I/O manager was given a command +-- to block, shutdown or suspend than that request is honored at the end of the +-- loop. +step :: Bool -> Manager -> IO (Bool, Maybe Seconds) +step maxDelay mgr@Manager{..} = do + -- Determine how long to wait the next time we block in an alertable state. + delay <- runExpiredTimeouts mgr + let !timer = if maxDelay && delay == Nothing + then #{const INFINITE} + else fromTimeout delay + debugIO $ "next timeout: " ++ show delay + debugIO $ "next timer: " ++ show timer -- todo: print as hex + case (maxDelay, delay) of + (_ , Just{} ) -> do setStatus WinIOWaiting + debugIO "I/O manager waiting." + (False, Nothing) -> do setStatus WinIOScanning + debugIO "I/O manager pausing." + (True , Nothing) -> do setStatus WinIOBlocked + debugIO "I/O manager deep sleep." + -- If threaded this call informs the threadpool that a thread is now + -- entering a kernel mode wait and this is free to be used. If non-threaded + -- then this is a no-op. + notifyWaiting mgrThreadPool + n <- if threaded + -- To quote Matt Godbolts: + -- There are some unusual edge cases you need to deal with. The + -- GetQueuedCompletionStatus function blocks a thread until there's + -- work for it to do. Based on the return value, the number of bytes + -- and the overlapped structure, there’s a lot of possible "reasons" + -- for the function to have returned. Deciphering all the possible + -- cases: + -- + -- ------------------------------------------------------------------------ + -- Ret value | OVERLAPPED | # of bytes | Description + -- ------------------------------------------------------------------------ + -- zero | NULL | n/a | Call to GetQueuedCompletionStatus + -- failed, and no data was dequeued from the IO port. This usually + -- indicates an error in the parameters to GetQueuedCompletionStatus. + -- + -- zero | non-NULL | n/a | Call to GetQueuedCompletionStatus + -- failed, but data was read or written. The thread must deal with the + -- data (possibly freeing any associated buffers), but there is an error + -- condition on the underlying HANDLE. Usually seen when the other end of + -- a network connection has been forcibly closed but there's still data in + -- the send or receive queue. + -- + -- non-zero | NULL | n/a | This condition doesn't happen due + -- to IO requests, but is useful to use in combination with + -- PostQueuedCompletionStatus as a way of indicating to threads that they + -- should terminate. + -- + -- non-zero | non-NULL | zero | End of file for a file HANDLE, or + -- the connection has been gracefully closed (for network connections). + -- The OVERLAPPED buffer has still been used; and must be deallocated if + -- necessary. + -- + -- non-zero | non-NULL | non-zero | "num bytes" of data have been + -- transferred into the block pointed by the OVERLAPPED structure. The + -- direction of the transfer is dependant on the call made to the IO + -- port, it's up to the user to remember if it was a read or a write + -- (usually by stashing extra data in the OVERLAPPED structure). The + -- thread must deallocate the structure as necessary. + -- + -- The getQueuedCompletionStatusEx call will remove entries queued by the OS + -- and returns the finished ones in mgrOverlappedEntries and the number of + -- entries removed. + then FFI.getQueuedCompletionStatusEx mgrIOCP mgrOverlappedEntries timer + else do num_req <- outstandingRequests + registerAlertableWait mgrIOCP timer num_req + return 0 + setStatus WinIORunning + -- If threaded this call informs the threadpool manager that a thread is + -- busy. If all threads are busy and we have not reached the maximum amount + -- of allowed threads then the threadpool manager will spawn a new thread to + -- allow us to scale under load. + notifyRunning mgrThreadPool + processCompletion mgr n delay + +-- | Process the results at the end of an evaluation loop. This function will +-- read all the completions, wake up all the Haskell threads, clean up the book +-- keeping of the I/O manager and return whether there are outstanding work to +-- be done and how long it expects to have to wait till it can take action +-- again. +-- +-- Note that this method can do less work than there are entries in the +-- completion table. This is because some completion entries may have been +-- created due to calls to interruptIOManager which will enqueue a faux +-- completion. +-- +-- NOTE: In Threaded mode things get a bit complicated the operation may have +-- been completed even before we even got around to put the request in the +-- waiting callback table. These events are handled by having a separate queue +-- for orphaned callback instances that the calling thread is supposed to check +-- before adding something to the work queue. +processCompletion :: Manager -> Int -> Maybe Seconds -> IO (Bool, Maybe Seconds) +processCompletion Manager{..} n delay = do + -- If some completions are done, we need to process them and call their + -- callbacks. We then remove the callbacks from the bookkeeping and resize + -- the index if required. + when (n > 0) $ do + forM_ [0..(n-1)] $ \idx -> do + oe <- A.unsafeRead mgrOverlappedEntries idx + let lpol = lpOverlapped oe + when (lpol /= nullPtr) $ do + let hs_lpol = castPtr lpol :: Ptr FFI.HASKELL_OVERLAPPED + let ptr_lpol = castPtr (hs_lpol `plusPtr` cdOffset) :: Ptr (Ptr CompletionData) + cdDataCheck <- peek ptr_lpol + debugIO $ " $ checking " ++ show lpol + ++ " -en ptr_lpol: " ++ show ptr_lpol + ++ " offset: " ++ show cdOffset + ++ " cdData: " ++ show cdDataCheck + ++ " at idx " ++ show idx + let oldDataPtr = exchangePtr ptr_lpol nullReq + when (oldDataPtr /= nullReq) $ + do payload <- peek oldDataPtr + debugIO $ "exchanged: " ++ show oldDataPtr + let !(CompletionData _hwnd cb) = payload + -- free oldDataPtr + reqs <- removeRequest + debugIO $ "-1.. " ++ show reqs ++ " requests queued." + status <- FFI.overlappedIOStatus (lpOverlapped oe) + -- TODO: Remap between STATUS_ and ERROR_ instead + -- of re-interpret here. But for now, don't care. + let status' = fromIntegral status + cb status' (dwNumberOfBytesTransferred oe) + -- free hs_lpol + + -- clear the array so we don't erroneously interpret the output, in + -- certain circumstances like lockFileEx the code could return 1 entry + -- removed but the file data not been filled in. + -- TODO: Maybe not needed.. + A.clear mgrOverlappedEntries + + -- Check to see if we received the maximum amount of entries we could + -- this likely indicates a high number of I/O requests have been queued. + -- In which case we should process more at a time. + cap <- A.capacity mgrOverlappedEntries + when (cap == n) $ A.ensureCapacity mgrOverlappedEntries (2*cap) + + -- Keep running if we still have some work queued or + -- if we have a pending delay. + reqs <- outstandingRequests + debugIO $ "outstanding requests: " ++ show reqs + let more = reqs > 0 + debugIO $ "has more: " ++ show more ++ " - removed: " ++ show n + return (more || (isJust delay && threaded), delay) + +-- | Entry point for the non-threaded I/O manager to be able to process +-- completed completions. It is mostly a wrapper around processCompletion. +processRemoteCompletion :: IO () +processRemoteCompletion = do + alloca $ \ptr_n -> do + debugIO "processRemoteCompletion :: start ()" + -- First figure out how much work we have to do. + entries <- getOverlappedEntries ptr_n + n <- fromIntegral `fmap` peek ptr_n + -- This call will unmarshal data from the C buffer but pointers inside of + -- this have not been read yet. + _ <- peekArray n entries + mngr <- getSystemManager + let arr = mgrOverlappedEntries mngr + A.unsafeSplat arr entries n + _ <- processCompletion mngr n Nothing + num_left <- outstandingRequests + -- This call will unblock the non-threaded I/O manager. After this it is no + -- longer safe to use `entries` nor `completed`. + servicedIOEntries num_left + setStatus WinIOBlocked + -- We may have been woken up due to a timer timeout. So check for any + -- expired timeouts. If we have processed any completions only check + -- timeouts, if we have been woken up only to process timeouts then check if + -- we have to change the wait interval. + -- + -- When not the threaded runtime we would not have reset the timer events + -- below. Because of this when the request is done we have an additional + -- step here to reset the wait timers so the I/O manager doesn't keep + -- polling at the temporary high frequency we entered. + if (n == 0) + then step True mngr >> return () + else runExpiredTimeouts mngr >> return () + debugIO "processRemoteCompletion :: done ()" + return () + +-- | Even loop for the Threaded I/O manager. The one for the non-threaded +-- I/O manager is in AsyncWinIO.c in the rts. +io_mngr_loop :: HANDLE -> Manager -> IO () +io_mngr_loop _event mgr = go False + where + go maxDelay = + do setStatus WinIORunning + (more, delay) <- step maxDelay mgr + debugIO "I/O manager stepping." + r2 <- c_readIOManagerEvent + exit <- + case r2 of + _ | r2 == io_MANAGER_WAKEUP -> return False + _ | r2 == io_MANAGER_DIE -> return True + 0 -> return False -- spurious wakeup + _ -> do debugIO $ "handling console event: " ++ show (r2 `shiftR` 1) + start_console_handler (r2 `shiftR` 1) + return False + + -- If we have no more work to do, or something from the outside + -- told us to stop then we let the thread die and stop the I/O + -- manager. It will be woken up again when there is more to do. + case () of + _ | exit -> do setStatus WinIODone + debugIO "I/O manager shutting down." + _ | not threaded -> do setStatus WinIOBlocked + debugIO "I/O manager single threaded halt." + _ | isJust delay -> go False + -- We seem to have more work but no ETA for it. + -- So just retry until we run out of work. + _ | more -> go False + _ -> go True + +io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32 +io_MANAGER_WAKEUP = #{const IO_MANAGER_WAKEUP} +io_MANAGER_DIE = #{const IO_MANAGER_DIE} + +-- | Wake up a single thread from the I/O Manager's worker queue. This will +-- unblock a thread blocked in `processCompletion` and allows the I/O manager to +-- react accordingly to changes in timers or to process console signals. +wakeupIOManager :: IO () +wakeupIOManager + = do mngr <- getSystemManager + status <- getStatus + when (status /= WinIORunning) $ do + event <- c_getIOManagerEvent + debugIO "waking up I/O manager." + startIOManagerThread (io_mngr_loop event mngr) + +foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_getIOManagerEvent :: IO HANDLE + +foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_readIOManagerEvent :: IO Word32 + +foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c) + c_sendIOManagerEvent :: Word32 -> IO () + +foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool + + +-- --------------------------------------------------------------------------- +-- I/O manager event notifications + + +data HandleData = HandleData { + tokenKey :: {-# UNPACK #-} !HandleKey + , tokenEvents :: {-# UNPACK #-} !EventLifetime + , _handleCallback :: !EventCallback + } + +-- | A file handle registration cookie. +data HandleKey = HandleKey { + handleValue :: {-# UNPACK #-} !HANDLE + , handleUnique :: {-# UNPACK #-} !Unique + } deriving ( Eq -- ^ @since 4.4.0.0 + , Show -- ^ @since 4.4.0.0 + ) + +-- | Callback invoked on I/O events. +type EventCallback = HandleKey -> Event -> IO () + +registerHandle :: Manager -> EventCallback -> HANDLE -> Event -> Lifetime + -> IO HandleKey +registerHandle (Manager{..}) cb hwnd evs lt = do + u <- newUnique mgrUniqueSource + let reg = HandleKey hwnd u + hwnd' = fromIntegral $ ptrToIntPtr hwnd + el = I.eventLifetime evs lt + !hwdd = HandleData reg el cb + event = EventData evs [(evs, hwdd)] + _ <- withMVar mgrEvntHandlers $ \evts -> do + IT.insertWith mappend hwnd' event evts + wakeupIOManager + return reg + +unregisterHandle :: Manager -> HandleKey -> IO () +unregisterHandle (Manager{..}) key@HandleKey{..} = do + withMVar mgrEvntHandlers $ \evts -> do + let hwnd' = fromIntegral $ ptrToIntPtr handleValue + val <- IT.lookup hwnd' evts + case val of + Nothing -> return () + Just (EventData evs lst) -> do + let cmp (_, a) (_, b) = tokenKey a == tokenKey b + key' = (undefined, HandleData key undefined undefined) + updated = deleteBy cmp key' lst + new_lst = EventData evs updated + _ <- IT.updateWith (\_ -> return new_lst) hwnd' evts + return () + + +-- --------------------------------------------------------------------------- +-- debugging + +#if defined(DEBUG) +c_DEBUG_DUMP :: IO Bool +c_DEBUG_DUMP = return True -- scheduler `fmap` getDebugFlags +#endif + +debugIO :: String -> IO () +#if defined(DEBUG) +debugIO s + = do debug <- c_DEBUG_DUMP + if debug + then do tid <- myThreadId + let pref = if threaded then "\t" else "" + _ <- withCStringLen (pref ++ "winio: " ++ s ++ " (" ++ + showThreadId tid ++ ")\n") $ + \(p, len) -> c_write 2 (castPtr p) (fromIntegral len) + return () + else do return () +#else +debugIO _ = return () +#endif + +dbxIO :: String -> IO () +dbxIO s = do tid <- myThreadId + let pref = if threaded then "\t" else "" + _ <- withCStringLen (pref ++ "winio: " ++ s ++ " (" ++ + showThreadId tid ++ ")\n") $ + \(p, len) -> c_write 2 (castPtr p) (fromIntegral len) + return ()
\ No newline at end of file diff --git a/libraries/base/GHC/Event/Windows/Clock.hs b/libraries/base/GHC/Event/Windows/Clock.hs new file mode 100644 index 0000000000..34728248c0 --- /dev/null +++ b/libraries/base/GHC/Event/Windows/Clock.hs @@ -0,0 +1,55 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE NoImplicitPrelude #-} +module GHC.Event.Windows.Clock ( + Clock, + Seconds, + getTime, + getClock, + + -- * Specific implementations + queryPerformanceCounter, + getTickCount64 +) where + +import qualified GHC.Event.Windows.FFI as FFI + +import Data.Maybe +import GHC.Base +import GHC.Real + +-- | Monotonic clock +newtype Clock = Clock (IO Seconds) + +type Seconds = Double + +-- | Get the current time, in seconds since some fixed time in the past. +getTime :: Clock -> IO Seconds +getTime (Clock io) = io + +-- | Figure out what time API to use, and return a 'Clock' for accessing it. +getClock :: IO Clock +getClock = tryInOrder + [ queryPerformanceCounter + , fmap Just getTickCount64 + ] + +tryInOrder :: Monad m => [m (Maybe a)] -> m a +tryInOrder (x:xs) = x >>= maybe (tryInOrder xs) return +tryInOrder [] = undefined + +mapJust :: Monad m => m (Maybe a) -> (a -> b) -> m (Maybe b) +mapJust m f = liftM (fmap f) m + +queryPerformanceCounter :: IO (Maybe Clock) +queryPerformanceCounter = + FFI.queryPerformanceFrequency `mapJust` \freq -> + Clock $! do + count <- FFI.queryPerformanceCounter + let !secs = fromIntegral count / fromIntegral freq + return secs + +getTickCount64 :: IO Clock +getTickCount64 = + return $! Clock $! do + msecs <- FFI.getTickCount64 + return $! fromIntegral msecs / 1000 diff --git a/libraries/base/GHC/Event/Windows/ConsoleEvent.hsc b/libraries/base/GHC/Event/Windows/ConsoleEvent.hsc new file mode 100644 index 0000000000..fd6f790d3b --- /dev/null +++ b/libraries/base/GHC/Event/Windows/ConsoleEvent.hsc @@ -0,0 +1,72 @@ +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-} + +----------------------------------------------------------------------------- +-- | +-- Module : GHC.Event.Windows.ConsoleEvent +-- Copyright : (c) The University of Glasgow, 1994-2002 +-- License : see libraries/base/LICENSE +-- +-- Maintainer : cvs-ghc@haskell.org +-- Stability : internal +-- Portability : non-portable (GHC extensions) +-- +-- Windows I/O manager interfaces. Depending on which I/O Subsystem is used +-- requests will be routed to different places. +-- +----------------------------------------------------------------------------- + +module GHC.Event.Windows.ConsoleEvent ( + ConsoleEvent (..), + start_console_handler, + toWin32ConsoleEvent, + win32ConsoleHandler +) where + +import GHC.Base +import GHC.Conc.Sync +import GHC.Enum (Enum) +import GHC.IO (unsafePerformIO) +import GHC.MVar +import GHC.Num (Num(..)) +import GHC.Read (Read) +import GHC.Word (Word32) +import GHC.Show (Show) + +#include <windows.h> + +data ConsoleEvent + = ControlC + | Break + | Close + -- these are sent to Services only. + | Logoff + | Shutdown + deriving ( Eq -- ^ @since 4.3.0.0 + , Ord -- ^ @since 4.3.0.0 + , Enum -- ^ @since 4.3.0.0 + , Show -- ^ @since 4.3.0.0 + , Read -- ^ @since 4.3.0.0 + ) + +start_console_handler :: Word32 -> IO () +start_console_handler r = + case toWin32ConsoleEvent r of + Just x -> withMVar win32ConsoleHandler $ \handler -> do + _ <- forkIO (handler x) + return () + Nothing -> return () + +toWin32ConsoleEvent :: (Eq a, Num a) => a -> Maybe ConsoleEvent +toWin32ConsoleEvent ev = + case ev of + #{const CTRL_C_EVENT } -> Just ControlC + #{const CTRL_BREAK_EVENT } -> Just Break + #{const CTRL_CLOSE_EVENT } -> Just Close + #{const CTRL_LOGOFF_EVENT } -> Just Logoff + #{const CTRL_SHUTDOWN_EVENT } -> Just Shutdown + _ -> Nothing + +win32ConsoleHandler :: MVar (ConsoleEvent -> IO ()) +win32ConsoleHandler = + unsafePerformIO (newMVar (errorWithoutStackTrace "win32ConsoleHandler")) diff --git a/libraries/base/GHC/Event/Windows/FFI.hsc b/libraries/base/GHC/Event/Windows/FFI.hsc new file mode 100644 index 0000000000..02bac48a62 --- /dev/null +++ b/libraries/base/GHC/Event/Windows/FFI.hsc @@ -0,0 +1,395 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DoAndIfThenElse #-} +{-# LANGUAGE ForeignFunctionInterface #-} +{-# LANGUAGE EmptyDataDecls #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE NoImplicitPrelude #-} + +------------------------------------------------------------------------------- +-- | +-- Module : GHC.Event.Windows.FFI +-- Copyright : (c) Tamar Christina 2019 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable +-- +-- WinIO Windows API Foreign Function imports +-- +------------------------------------------------------------------------------- + +module GHC.Event.Windows.FFI ( + -- * IOCP + IOCP(..), + CompletionKey, + newIOCP, + associateHandleWithIOCP, + getQueuedCompletionStatusEx, + postQueuedCompletionStatus, + getOverlappedResult, + + -- * Overlapped + OVERLAPPED, + LPOVERLAPPED, + OVERLAPPED_ENTRY(..), + LPOVERLAPPED_ENTRY, + HASKELL_OVERLAPPED, + LPHASKELL_OVERLAPPED, + allocOverlapped, + zeroOverlapped, + pokeOffsetOverlapped, + overlappedIOStatus, + overlappedIONumBytes, + + -- * Cancel pending I/O + cancelIoEx, + cancelIoEx', + + -- * Monotonic time + + -- ** GetTickCount + getTickCount64, + + -- ** QueryPerformanceCounter + queryPerformanceCounter, + queryPerformanceFrequency, + + -- ** Miscellaneous + throwWinErr, + setLastError +) where + +#include <ntstatus.h> +#include <windows.h> +#include "winio_structs.h" + +##include "windows_cconv.h" + +import Data.Maybe +import Foreign +import GHC.Base +import GHC.Num ((*)) +import GHC.Real (fromIntegral) +import GHC.Show +import GHC.Windows +import qualified GHC.Event.Array as A +import qualified GHC.Windows as Win32 +import GHC.IO.Handle.Internals (debugIO) + +------------------------------------------------------------------------ +-- IOCP + +-- | An I/O completion port. +newtype IOCP = IOCP HANDLE + deriving (Eq, Ord, Show) + +type CompletionKey = ULONG_PTR + +-- | This function has two distinct purposes depending on the value of +-- The completion port handle: +-- +-- - When the IOCP port is NULL then the function creates a new I/O completion +-- port. See `newIOCP`. +-- +-- - When The port contains a valid handle then the given handle is +-- associated with he given completion port handle. Once associated it +-- cannot be easily changed. Associating a Handle with a Completion Port +-- allows the I/O manager's worker threads to handle requests to the given +-- handle. +foreign import WINDOWS_CCONV unsafe "windows.h CreateIoCompletionPort" + c_CreateIoCompletionPort :: HANDLE -> IOCP -> ULONG_PTR -> DWORD + -> IO IOCP + +-- | Create a new I/O completion port. +newIOCP :: IO IOCP +newIOCP = failIf (== IOCP nullPtr) "newIOCP" $ + c_CreateIoCompletionPort iNVALID_HANDLE_VALUE (IOCP nullPtr) 0 0 + +-- | Associate a HANDLE with an I/O completion port. +associateHandleWithIOCP :: IOCP -> HANDLE -> CompletionKey -> IO () +associateHandleWithIOCP iocp handle completionKey = + failIf_ (/= iocp) "associateHandleWithIOCP" $ + c_CreateIoCompletionPort handle iocp completionKey 0 + +foreign import WINDOWS_CCONV safe "windows.h GetOverlappedResult" + c_GetOverlappedResult :: HANDLE -> LPOVERLAPPED -> Ptr DWORD -> BOOL + -> IO BOOL + +-- | Get the result of a single overlap operation without the IO manager +getOverlappedResult :: HANDLE -> Ptr OVERLAPPED -> BOOL -> IO (Maybe DWORD) +getOverlappedResult handle lp block + = alloca $ \bytes -> + do res <- c_GetOverlappedResult handle lp bytes block + if res + then fmap Just $ peek bytes + else return Nothing + +foreign import WINDOWS_CCONV safe "windows.h GetQueuedCompletionStatusEx" + c_GetQueuedCompletionStatusEx :: IOCP -> LPOVERLAPPED_ENTRY -> Word32 + -> Ptr ULONG -> DWORD -> BOOL -> IO BOOL + +-- | Note [Completion Ports] +-- When an I/O operation has been queued by an operation +-- (ReadFile/WriteFile/etc) it is placed in a queue that the driver uses when +-- servicing IRQs. This queue has some important properties: +-- +-- 1.) It is not an ordered queue. Requests may be performed out of order as +-- as the OS's native I/O manager may try to re-order requests such that as +-- few random seeks as possible are needed to complete the pending +-- operations. As such do not assume a fixed order between something being +-- queued and dequeued. +-- +-- 2.) Operations may skip the queue entirely. In which case they do not end in +-- in this function. (This is an optimization flag we have turned on. See +-- `openFile`.) +-- +-- 3.) Across this call the specified OVERLAPPED_ENTRY buffer MUST remain live, +-- and the buffer for an I/O operation cannot be freed or moved until +-- `getOverlappedResult` says it's done. The reason is the kernel may not +-- have fully released the buffer, or finished writing to it when this +-- operation returns. Failure to adhere to this will cause your IRQs to be +-- silently dropped and your program will never receive a completion for it. +-- This means that the OVERLAPPED buffer must also remain valid for the +-- duration of the call and as such must be allocated on the unmanaged heap. +-- +-- 4.) When a thread calls this method it is associated with the I/O manager's +-- worker threads pool. You should always use dedicated threads for this +-- since the OS I/O manager will now monitor the threads. If the thread +-- becomes blocked for whatever reason, the I/O manager will wake up +-- another threads from it's pool to service the remaining results. +-- A new thread will also be woken up from the pool when the previous thread +-- is busy servicing requests and new requests have finished. For this +-- reason the I/O manager multiplexes I/O operations from N haskell threads +-- into 1 completion port, which is serviced by M native threads in an +-- asynchronous method. This allows it to scale efficiently. +getQueuedCompletionStatusEx :: IOCP + -> A.Array OVERLAPPED_ENTRY + -> DWORD -- ^ Timeout in milliseconds (or + -- 'GHC.Windows.iNFINITE') + -> IO Int +getQueuedCompletionStatusEx iocp arr timeout = + alloca $ \num_removed_ptr ->do + A.unsafeLoad arr $ \oes cap -> do + -- TODO: remove after debugging + fillBytes oes 0 (cap * (sizeOf (undefined :: OVERLAPPED_ENTRY))) + debugIO $ "-- call getQueuedCompletionStatusEx " + -- don't block the call if the rts is not supporting threads. + -- this would block the entire program. + let alertable = False -- not rtsSupportsBoundThreads + ok <- c_GetQueuedCompletionStatusEx iocp oes (fromIntegral cap) + num_removed_ptr timeout alertable + debugIO $ "-- call getQueuedCompletionStatusEx: " ++ show ok + err <- getLastError + nc <- (peek num_removed_ptr) + debugIO $ "-- getQueuedCompletionStatusEx: n=" ++ show nc ++ " ,err=" ++ show err + if ok then fromIntegral `fmap` peek num_removed_ptr + else do debugIO $ "failed getQueuedCompletionStatusEx: " ++ show err + if err == #{const WAIT_TIMEOUT} || alertable then return 0 + else failWith "GetQueuedCompletionStatusEx" err + +overlappedIOStatus :: LPOVERLAPPED -> IO NTSTATUS +overlappedIOStatus lpol = do + status <- #{peek OVERLAPPED, Internal} lpol + -- TODO: Map NTSTATUS to ErrCode? + -- See https://github.com/libuv/libuv/blob/b12624c13693c4d29ca84b3556eadc9e9c0936a4/src/win/winsock.c#L153 + return status +{-# INLINE overlappedIOStatus #-} + +overlappedIONumBytes :: LPOVERLAPPED -> IO ULONG_PTR +overlappedIONumBytes lpol = do + bytes <- #{peek OVERLAPPED, InternalHigh} lpol + return bytes +{-# INLINE overlappedIONumBytes #-} + +foreign import WINDOWS_CCONV unsafe "windows.h PostQueuedCompletionStatus" + c_PostQueuedCompletionStatus :: IOCP -> DWORD -> ULONG_PTR -> LPOVERLAPPED + -> IO BOOL + +-- | Manually post a completion to the specified I/O port. This will wake up +-- a thread waiting `GetQueuedCompletionStatusEx`. +postQueuedCompletionStatus :: IOCP -> DWORD -> CompletionKey -> LPOVERLAPPED + -> IO () +postQueuedCompletionStatus iocp numBytes completionKey lpol = + failIfFalse_ "PostQueuedCompletionStatus" $ + c_PostQueuedCompletionStatus iocp numBytes completionKey lpol + +------------------------------------------------------------------------ +-- Overlapped + +-- | Tag type for @LPOVERLAPPED@. +data OVERLAPPED + +-- | Tag type for the extended version of @OVERLAPPED@ containg some book +-- keeping information. +data HASKELL_OVERLAPPED + +-- | Identifies an I/O operation. Used as the @LPOVERLAPPED@ parameter +-- for overlapped I/O functions (e.g. @ReadFile@, @WSASend@). +type LPOVERLAPPED = Ptr OVERLAPPED + +-- | Pointer to the extended HASKELL_OVERLAPPED function. +type LPHASKELL_OVERLAPPED = Ptr HASKELL_OVERLAPPED + +-- | An array of these is passed to GetQueuedCompletionStatusEx as an output +-- argument. +data OVERLAPPED_ENTRY = OVERLAPPED_ENTRY { + lpCompletionKey :: ULONG_PTR, + lpOverlapped :: LPOVERLAPPED, + dwNumberOfBytesTransferred :: DWORD + } + +type LPOVERLAPPED_ENTRY = Ptr OVERLAPPED_ENTRY + +instance Storable OVERLAPPED_ENTRY where + sizeOf _ = #{size OVERLAPPED_ENTRY} + alignment _ = #{alignment OVERLAPPED_ENTRY} + + peek ptr = do + lpCompletionKey <- #{peek OVERLAPPED_ENTRY, lpCompletionKey} ptr + lpOverlapped <- #{peek OVERLAPPED_ENTRY, lpOverlapped} ptr + dwNumberOfBytesTransferred <- + #{peek OVERLAPPED_ENTRY, dwNumberOfBytesTransferred} ptr + let !oe = OVERLAPPED_ENTRY{..} + return oe + + poke ptr OVERLAPPED_ENTRY{..} = do + #{poke OVERLAPPED_ENTRY, lpCompletionKey} ptr lpCompletionKey + #{poke OVERLAPPED_ENTRY, lpOverlapped} ptr lpOverlapped + #{poke OVERLAPPED_ENTRY, dwNumberOfBytesTransferred} + ptr dwNumberOfBytesTransferred + +-- | Allocate a new +-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms684342%28v=vs.85%29.aspx +-- OVERLAPPED> structure on the unmanaged heap. This also zeros the memory to +-- prevent the values inside the struct to be incorrectlt interpreted as data +-- payload. +-- +-- We extend the overlapped structure with some extra book keeping information +-- such that we don't have to do a lookup on the Haskell side. +-- +-- Future: We can gain some performance here by using a pool instead of calling +-- malloc for each request. A simple block allocator would be very +-- useful here, especially when we implement sockets support. +allocOverlapped :: Word64 -- ^ Offset/OffsetHigh + -> IO (Ptr HASKELL_OVERLAPPED) +allocOverlapped offset = do + lpol <- mallocBytes #{size HASKELL_OVERLAPPED} + zeroOverlapped lpol + pokeOffsetOverlapped (castPtr lpol) offset + return lpol + +-- | Zero-fill an HASKELL_OVERLAPPED structure. +zeroOverlapped :: LPHASKELL_OVERLAPPED -> IO () +zeroOverlapped lpol = fillBytes lpol 0 #{size HASKELL_OVERLAPPED} +{-# INLINE zeroOverlapped #-} + +-- | Set the offset field in an OVERLAPPED structure. +pokeOffsetOverlapped :: LPOVERLAPPED -> Word64 -> IO () +pokeOffsetOverlapped lpol offset = do + let (offsetHigh, offsetLow) = Win32.ddwordToDwords offset + #{poke OVERLAPPED, Offset} lpol offsetLow + #{poke OVERLAPPED, OffsetHigh} lpol offsetHigh +{-# INLINE pokeOffsetOverlapped #-} + +------------------------------------------------------------------------ +-- Cancel pending I/O + +-- | CancelIo shouldn't block, but cancellation happens infrequently, +-- so we might as well be on the safe side. +foreign import WINDOWS_CCONV unsafe "windows.h CancelIoEx" + c_CancelIoEx :: HANDLE -> LPOVERLAPPED -> IO BOOL + +-- | Cancel all pending overlapped I/O for the given file that was initiated by +-- the current OS thread. Cancelling is just a request for cancellation and +-- before the OVERLAPPED struct is freed we must make sure that the IRQ has been +-- removed from the queue. See `getOverlappedResult`. +cancelIoEx :: HANDLE -> LPOVERLAPPED -> IO () +cancelIoEx h o = failIfFalse_ "CancelIoEx" . c_CancelIoEx h $ o + +cancelIoEx' :: HANDLE -> LPOVERLAPPED -> IO Bool +cancelIoEx' = c_CancelIoEx + +------------------------------------------------------------------------ +-- Monotonic time + +foreign import WINDOWS_CCONV "windows.h GetTickCount64" + c_GetTickCount64 :: IO #{type ULONGLONG} + +-- | Call the @GetTickCount64@ function, which returns a monotonic time in +-- milliseconds. +-- +-- Problems: +-- +-- * Low resolution (10 to 16 milliseconds). +-- +-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms724408%28v=vs.85%29.aspx> +getTickCount64 :: IO Word64 +getTickCount64 = c_GetTickCount64 + +-- | Call the @QueryPerformanceCounter@ function. +-- +-- Problems: +-- +-- * Might not be available on some hardware. Use 'queryPerformanceFrequency' +-- to test for availability before calling this function. +-- +-- * On a multiprocessor computer, may produce different results on +-- different processors due to hardware bugs. +-- +-- To get a monotonic time in seconds, divide the result of +-- 'queryPerformanceCounter' by that of 'queryPerformanceFrequency'. +-- +-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms644904%28v=vs.85%29.aspx> +queryPerformanceCounter :: IO Int64 +queryPerformanceCounter = + callQP c_QueryPerformanceCounter + >>= maybe (throwGetLastError "QueryPerformanceCounter") return + +-- | Call the @QueryPerformanceFrequency@ function. Return 'Nothing' if the +-- hardware does not provide a high-resolution performance counter. +-- +-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms644905%28v=vs.85%29.aspx> +queryPerformanceFrequency :: IO (Maybe Int64) +queryPerformanceFrequency = do + m <- callQP c_QueryPerformanceFrequency + case m of + Nothing -> return Nothing + Just 0 -> return Nothing -- Shouldn't happen; just a safeguard to + -- avoid a zero denominator. + Just freq -> return (Just freq) + +type QPFunc = Ptr Int64 -> IO BOOL + +foreign import WINDOWS_CCONV "Windows.h QueryPerformanceCounter" + c_QueryPerformanceCounter :: QPFunc + +foreign import WINDOWS_CCONV "Windows.h QueryPerformanceFrequency" + c_QueryPerformanceFrequency :: QPFunc + +callQP :: QPFunc -> IO (Maybe Int64) +callQP qpfunc = + allocaBytes #{size LARGE_INTEGER} $ \ptr -> do + ok <- qpfunc ptr + if ok then do + n <- #{peek LARGE_INTEGER, QuadPart} ptr + return (Just n) + else + return Nothing + +------------------------------------------------------------------------ +-- Miscellaneous + +type ULONG_PTR = #type ULONG_PTR + +throwWinErr :: String -> ErrCode -> IO a +throwWinErr loc err = do + c_SetLastError err + Win32.failWith loc err + +setLastError :: ErrCode -> IO () +setLastError = c_SetLastError + +foreign import WINDOWS_CCONV unsafe "windows.h SetLastError" + c_SetLastError :: ErrCode -> IO () diff --git a/libraries/base/GHC/Event/Windows/ManagedThreadPool.hs b/libraries/base/GHC/Event/Windows/ManagedThreadPool.hs new file mode 100644 index 0000000000..11c1259257 --- /dev/null +++ b/libraries/base/GHC/Event/Windows/ManagedThreadPool.hs @@ -0,0 +1,98 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DoAndIfThenElse #-} +{-# LANGUAGE ForeignFunctionInterface #-} +{-# LANGUAGE EmptyDataDecls #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE NoImplicitPrelude #-} + +------------------------------------------------------------------------------- +-- | +-- Module : GHC.Event.Windows.ManagedThreadPool +-- Copyright : (c) Tamar Christina 2019 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable +-- +-- WinIO Windows Managed Thread pool API. This thread pool scales dynamically +-- based on demand. +-- +------------------------------------------------------------------------------- + +module GHC.Event.Windows.ManagedThreadPool + ( ThreadPool(..) + , startThreadPool + , notifyRunning + , notifyWaiting + ) where + +import Control.Concurrent.MVar +import Data.Maybe +import Foreign +import GHC.Base +import GHC.Num ((-), (+)) +import GHC.Real (fromIntegral) +import GHC.Show +import GHC.Windows +import qualified GHC.Event.Array as A +import qualified GHC.Windows as Win32 +import GHC.IO.Handle.Internals (debugIO) +import GHC.Conc.Sync (forkIO, showThreadId, + ThreadId(..), ThreadStatus(..), + threadStatus, sharedCAF) +import System.IO.Unsafe (unsafePerformIO) +import GHC.RTS.Flags + +------------------------------------------------------------------------ +-- Thread spool manager + +type WorkerJob = IO () + +-- | Thread pool manager state +data ThreadPool = ThreadPool + { thrMainThread :: Maybe ThreadId + , thrMaxThreads :: {-# UNPACK #-} !Int + , thrMinThreads :: {-# UNPACK #-} !Int + , thrCurThreads :: {-# UNPACK #-} !Int + , thrCallBack :: WorkerJob + , thrActiveThreads :: MVar Int + , thrMonitor :: MVar () + , thrThreadIds :: {-#UNPACK #-} !(A.Array ThreadId) + } + +startThreadPool :: WorkerJob -> IO ThreadPool +startThreadPool job = do + debugIO "Starting I/O manager threadpool..." + let thrMinThreads = 2 + let thrCurThreads = 0 + let thrCallBack = job + thrMaxThreads <- (fromIntegral . numIoWorkerThreads) `fmap` getMiscFlags + thrActiveThreads <- newMVar 0 + thrMonitor <- newEmptyMVar + thrThreadIds <- undefined -- A.new thrMaxThreads + let thrMainThread = Nothing + + let !pool = ThreadPool{..} + return pool + +monitorThreadPool :: MVar () -> IO () +monitorThreadPool monitor = do + active <- takeMVar monitor + + return () + +notifyRunning :: Maybe ThreadPool -> IO () +notifyRunning Nothing = return () +notifyRunning (Just pool) = do + modifyMVar_ (thrActiveThreads pool) (\x -> return $ x + 1) + _ <- tryPutMVar (thrMonitor pool) () + return () + +notifyWaiting :: Maybe ThreadPool -> IO () +notifyWaiting Nothing = return () +notifyWaiting (Just pool) = do + modifyMVar_ (thrActiveThreads pool) (\x -> return $ x - 1) + _ <- tryPutMVar (thrMonitor pool) () + return () diff --git a/libraries/base/GHC/Event/Windows/Thread.hs b/libraries/base/GHC/Event/Windows/Thread.hs new file mode 100644 index 0000000000..21d39e14b5 --- /dev/null +++ b/libraries/base/GHC/Event/Windows/Thread.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE NoImplicitPrelude #-} +module GHC.Event.Windows.Thread ( + ensureIOManagerIsRunning, + interruptIOManager, + threadDelay, + registerDelay, +) where + +import GHC.Conc.Sync +import GHC.Base +import GHC.IO +import GHC.IOPort +import GHC.Real + +import GHC.Event.Windows.Clock +import GHC.Event.Windows + +ensureIOManagerIsRunning :: IO () +ensureIOManagerIsRunning = wakeupIOManager + +interruptIOManager :: IO () +interruptIOManager = interruptSystemManager + +threadDelay :: Int -> IO () +threadDelay usecs = mask_ $ do + m <- newEmptyIOPort + mgr <- getSystemManager + reg <- registerTimeout mgr secs $ writeIOPort m () >> return () + readIOPort m `onException` unregisterTimeout mgr reg + where + secs = microsecondsToSeconds usecs + +registerDelay :: Int -> IO (TVar Bool) +registerDelay usecs = do + t <- newTVarIO False + mgr <- getSystemManager + _ <- registerTimeout mgr secs $ atomically $ writeTVar t True + return t + where + secs = microsecondsToSeconds usecs + +microsecondsToSeconds :: Int -> Seconds +microsecondsToSeconds us = fromIntegral us / 1000000.0 diff --git a/libraries/base/GHC/IO/Types.hs b/libraries/base/GHC/IO/Types.hs new file mode 100644 index 0000000000..3124c48052 --- /dev/null +++ b/libraries/base/GHC/IO/Types.hs @@ -0,0 +1,41 @@ +{-# LANGUAGE NoImplicitPrelude, TypeSynonymInstances, FlexibleInstances #-} +------------------------------------------------------------------------------- +-- | +-- Module : GHC.IO.Types +-- Copyright : (c) Tamar Christina 2018 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable +-- +-- Abstraction over C Handle types for GHC, Unix wants FD (CInt) while Windows +-- Wants Handle (CIntPtr), so we abstract over them here. +-- +------------------------------------------------------------------------------- + +module GHC.IO.Types + ( module GHC.IO.Types + , IntPtr + , POSIX.Fd) where + +import GHC.Base +import GHC.Num +import GHC.Real + +import Foreign.Ptr (IntPtr, intPtrToPtr) +import qualified System.Posix.Types as POSIX +import qualified GHC.Windows as WIN32 + +-- To keep backwards compatibility with existing code we must use a type +-- class here due to the different widths of the native handle types of the +-- platforms. +class (Num a, Integral a) => BHandle a where + toFd :: a -> POSIX.Fd + toFd = fromIntegral + + toHandle :: a -> WIN32.HANDLE + toHandle = intPtrToPtr . fromIntegral + +instance BHandle POSIX.Fd where +instance BHandle IntPtr where diff --git a/libraries/base/cbits/consUtils.c b/libraries/base/cbits/consUtils.c index 5ca0c1b608..ac5d3ea75a 100644 --- a/libraries/base/cbits/consUtils.c +++ b/libraries/base/cbits/consUtils.c @@ -1,4 +1,4 @@ -/* +/* * (c) The University of Glasgow 2002 * * Win32 Console API support @@ -46,7 +46,7 @@ set_console_buffering__(int fd, int cooked) leave ECHO_INPUT enabled without also having LINE_INPUT, so we have to turn both off here. */ DWORD flgs = ENABLE_LINE_INPUT | ENABLE_ECHO_INPUT; - + if ( (h = (HANDLE)_get_osfhandle(fd)) != INVALID_HANDLE_VALUE ) { if ( GetConsoleMode(h,&st) && SetConsoleMode(h, cooked ? (st | ENABLE_LINE_INPUT) : st & ~flgs) ) { diff --git a/libraries/base/include/winio_structs.h b/libraries/base/include/winio_structs.h new file mode 100644 index 0000000000..da9dab05b7 --- /dev/null +++ b/libraries/base/include/winio_structs.h @@ -0,0 +1,40 @@ +/* + * (c) Tamar Christina, 2019. + * + * Structures supporting the IOCP based I/O Manager or Windows. + */ + +#include <Windows.h> +#include <stdint.h> + +#if defined(_WIN64) +# define ALIGNMENT __attribute__ ((aligned (8))) +#elif defined(_WIN32) +# define ALIGNMENT __attribute__ ((aligned (8))) +#else +# error "unknown environment, can't determine alignment" +#endif + +/* Completion data structure. Must be kept in sync with that in + GHC.Event.Windows or horrible things happen. */ +typedef struct _CompletionData { + /* The Handle to the object for which the I/O operation is in progress. */ + HWND cdHandle; + /* Handle to the callback routine to call to notify that an operation has + finished. This value is opaque as it shouldn't be accessible + outside the Haskell world. */ + uintptr_t cdCallback; +} CompletionData, *LPCompletionData; + +/* The Windows API Requires an OVERLAPPED struct for asynchronous access, + however if we pad the structure we can give extra book keeping information + without needing to look these up later. Do not modify this struct unless + you know what you're doing. */ +typedef struct _HASKELL_OVERLAPPED { + /* Windows OVERLAPPED structure. NOTE: MUST BE FIRST element. */ + OVERLAPPED hoOverlapped; + /* Pointer to additional payload in Haskell land. This will contain a + foreign pointer. We only use atomic operations to access this field in + order to correctly handle multiple threads using it. */ + LPCompletionData hoData ALIGNMENT; +} HASKELL_OVERLAPPED; |