summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTamar Christina <tamar@zhox.com>2019-06-16 21:31:22 +0100
committerBen Gamari <ben@smart-cactus.org>2020-07-15 16:41:02 -0400
commit4489af6bad11a198e9e6c192f41e17020f28d0c1 (patch)
treea7046d2982400ef86d1e026947618c29b908cd62
parent4bf542bf1cdf2fa468457fc0af21333478293476 (diff)
downloadhaskell-4489af6bad11a198e9e6c192f41e17020f28d0c1.tar.gz
winio: core threaded I/O manager
-rw-r--r--libraries/base/GHC/Conc/IOCP.hs29
-rw-r--r--libraries/base/GHC/Conc/POSIX.hs297
-rw-r--r--libraries/base/GHC/Conc/POSIX/Const.hsc30
-rw-r--r--libraries/base/GHC/Event/Internal/Types.hs160
-rw-r--r--libraries/base/GHC/Event/TimeOut.hs40
-rw-r--r--libraries/base/GHC/Event/Windows.hsc1188
-rw-r--r--libraries/base/GHC/Event/Windows/Clock.hs55
-rw-r--r--libraries/base/GHC/Event/Windows/ConsoleEvent.hsc72
-rw-r--r--libraries/base/GHC/Event/Windows/FFI.hsc395
-rw-r--r--libraries/base/GHC/Event/Windows/ManagedThreadPool.hs98
-rw-r--r--libraries/base/GHC/Event/Windows/Thread.hs43
-rw-r--r--libraries/base/GHC/IO/Types.hs41
-rw-r--r--libraries/base/cbits/consUtils.c4
-rw-r--r--libraries/base/include/winio_structs.h40
14 files changed, 2490 insertions, 2 deletions
diff --git a/libraries/base/GHC/Conc/IOCP.hs b/libraries/base/GHC/Conc/IOCP.hs
new file mode 100644
index 0000000000..299e4fed91
--- /dev/null
+++ b/libraries/base/GHC/Conc/IOCP.hs
@@ -0,0 +1,29 @@
+{-# LANGUAGE Trustworthy #-}
+{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-}
+{-# OPTIONS_GHC -Wno-missing-signatures #-}
+{-# OPTIONS_HADDOCK not-home #-}
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : GHC.Conc.IOCP
+-- Copyright : (c) The University of Glasgow, 1994-2002
+-- License : see libraries/base/LICENSE
+--
+-- Maintainer : cvs-ghc@haskell.org
+-- Stability : internal
+-- Portability : non-portable (GHC extensions)
+--
+-- Windows I/O Completion Port interface to the one defined in
+-- GHC.Event.Windows.
+--
+-- This module is an indirection to keep things in the same structure as before
+-- but also to keep the new code where the actual I/O manager is. As such it
+-- just re-exports GHC.Event.Windows.Thread
+--
+-----------------------------------------------------------------------------
+
+-- #not-home
+module GHC.Conc.IOCP
+ ( module GHC.Event.Windows.Thread ) where
+
+import GHC.Event.Windows.Thread
diff --git a/libraries/base/GHC/Conc/POSIX.hs b/libraries/base/GHC/Conc/POSIX.hs
new file mode 100644
index 0000000000..9a5243c2c0
--- /dev/null
+++ b/libraries/base/GHC/Conc/POSIX.hs
@@ -0,0 +1,297 @@
+{-# LANGUAGE Trustworthy #-}
+{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-}
+{-# OPTIONS_GHC -Wno-missing-signatures #-}
+{-# OPTIONS_HADDOCK not-home #-}
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : GHC.Conc.POSIX
+-- Copyright : (c) The University of Glasgow, 1994-2002
+-- License : see libraries/base/LICENSE
+--
+-- Maintainer : cvs-ghc@haskell.org
+-- Stability : internal
+-- Portability : non-portable (GHC extensions)
+--
+-- Windows I/O manager
+-- TODO: Switch this to use new I/O manager.
+--
+-----------------------------------------------------------------------------
+
+-- #not-home
+module GHC.Conc.POSIX
+ ( ensureIOManagerIsRunning
+ , interruptIOManager
+
+ -- * Waiting
+ , threadDelay
+ , registerDelay
+
+ -- * Miscellaneous
+ , asyncRead
+ , asyncWrite
+ , asyncDoProc
+
+ , asyncReadBA
+ , asyncWriteBA
+
+ , module GHC.Event.Windows.ConsoleEvent
+ ) where
+
+
+#include "windows_cconv.h"
+
+import Data.Bits (shiftR)
+import GHC.Base
+import GHC.Conc.Sync
+import GHC.Conc.POSIX.Const
+import GHC.Event.Windows.ConsoleEvent
+import GHC.IO (unsafePerformIO)
+import GHC.IORef
+import GHC.MVar
+import GHC.Num (Num(..))
+import GHC.Ptr
+import GHC.Real (div, fromIntegral)
+import GHC.Word (Word32, Word64)
+import GHC.Windows
+import Unsafe.Coerce ( unsafeCoerceUnlifted )
+
+-- ----------------------------------------------------------------------------
+-- Thread waiting
+
+-- Note: threadWaitRead and threadWaitWrite aren't really functional
+-- on Win32, but left in there because lib code (still) uses them (the manner
+-- in which they're used doesn't cause problems on a Win32 platform though.)
+
+asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
+ IO $ \s -> case asyncRead# fd isSock len buf s of
+ (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
+
+asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
+asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
+ IO $ \s -> case asyncWrite# fd isSock len buf s of
+ (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
+
+asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
+asyncDoProc (FunPtr proc) (Ptr param) =
+ -- the 'length' value is ignored; simplifies implementation of
+ -- the async*# primops to have them all return the same result.
+ IO $ \s -> case asyncDoProc# proc param s of
+ (# s', _len#, err# #) -> (# s', I# err# #)
+
+-- to aid the use of these primops by the IO Handle implementation,
+-- provide the following convenience funs:
+
+-- this better be a pinned byte array!
+asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
+asyncReadBA fd isSock len off bufB =
+ asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerceUnlifted bufB))) `plusPtr` off)
+
+asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
+asyncWriteBA fd isSock len off bufB =
+ asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerceUnlifted bufB))) `plusPtr` off)
+
+-- ----------------------------------------------------------------------------
+-- Threaded RTS implementation of threadDelay
+
+-- | Suspends the current thread for a given number of microseconds
+-- (GHC only).
+--
+-- There is no guarantee that the thread will be rescheduled promptly
+-- when the delay has expired, but the thread will never continue to
+-- run /earlier/ than specified.
+--
+threadDelay :: Int -> IO ()
+threadDelay time
+ | threaded = waitForDelayEvent time
+ | otherwise = IO $ \s ->
+ case time of { I# time# ->
+ case delay# time# s of { s' -> (# s', () #)
+ }}
+
+-- | Set the value of returned TVar to True after a given number of
+-- microseconds. The caveats associated with threadDelay also apply.
+--
+registerDelay :: Int -> IO (TVar Bool)
+registerDelay usecs
+ | threaded = waitForDelayEventSTM usecs
+ | otherwise = errorWithoutStackTrace "registerDelay: requires -threaded"
+
+foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
+
+waitForDelayEvent :: Int -> IO ()
+waitForDelayEvent usecs = do
+ m <- newEmptyMVar
+ target <- calculateTarget usecs
+ _ <- atomicModifyIORef'_ pendingDelays (\xs -> Delay target m : xs)
+ prodServiceThread
+ takeMVar m
+
+-- Delays for use in STM
+waitForDelayEventSTM :: Int -> IO (TVar Bool)
+waitForDelayEventSTM usecs = do
+ t <- atomically $ newTVar False
+ target <- calculateTarget usecs
+ _ <- atomicModifyIORef'_ pendingDelays (\xs -> DelaySTM target t : xs)
+ prodServiceThread
+ return t
+
+calculateTarget :: Int -> IO USecs
+calculateTarget usecs = do
+ now <- getMonotonicUSec
+ return $ now + (fromIntegral usecs)
+
+data DelayReq
+ = Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
+ | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
+
+{-# NOINLINE pendingDelays #-}
+pendingDelays :: IORef [DelayReq]
+pendingDelays = unsafePerformIO $ do
+ m <- newIORef []
+ sharedCAF m getOrSetGHCConcWindowsPendingDelaysStore
+
+foreign import ccall unsafe "getOrSetGHCConcWindowsPendingDelaysStore"
+ getOrSetGHCConcWindowsPendingDelaysStore :: Ptr a -> IO (Ptr a)
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ do
+ m <- newMVar Nothing
+ sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore
+
+foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore"
+ getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a)
+
+ensureIOManagerIsRunning :: IO ()
+ensureIOManagerIsRunning
+ | threaded = startIOManagerThread
+ | otherwise = return ()
+
+interruptIOManager :: IO ()
+interruptIOManager = return ()
+
+startIOManagerThread :: IO ()
+startIOManagerThread = do
+ modifyMVar_ ioManagerThread $ \old -> do
+ let create = do t <- forkIO ioManager; return (Just t)
+ case old of
+ Nothing -> create
+ Just t -> do
+ s <- threadStatus t
+ case s of
+ ThreadFinished -> create
+ ThreadDied -> create
+ _other -> return (Just t)
+
+insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
+insertDelay d [] = [d]
+insertDelay d1 ds@(d2 : rest)
+ | delayTime d1 <= delayTime d2 = d1 : ds
+ | otherwise = d2 : insertDelay d1 rest
+
+delayTime :: DelayReq -> USecs
+delayTime (Delay t _) = t
+delayTime (DelaySTM t _) = t
+
+type USecs = Word64
+type NSecs = Word64
+
+foreign import ccall unsafe "getMonotonicNSec"
+ getMonotonicNSec :: IO NSecs
+
+getMonotonicUSec :: IO USecs
+getMonotonicUSec = fmap (`div` 1000) getMonotonicNSec
+
+{-# NOINLINE prodding #-}
+prodding :: IORef Bool
+prodding = unsafePerformIO $ do
+ r <- newIORef False
+ sharedCAF r getOrSetGHCConcWindowsProddingStore
+
+foreign import ccall unsafe "getOrSetGHCConcWindowsProddingStore"
+ getOrSetGHCConcWindowsProddingStore :: Ptr a -> IO (Ptr a)
+
+prodServiceThread :: IO ()
+prodServiceThread = do
+ -- NB. use atomicSwapIORef here, otherwise there are race
+ -- conditions in which prodding is left at True but the server is
+ -- blocked in select().
+ was_set <- atomicSwapIORef prodding True
+ when (not was_set) wakeupIOManager
+
+-- ----------------------------------------------------------------------------
+-- Windows IO manager thread
+
+ioManager :: IO ()
+ioManager = do
+ wakeup <- c_getIOManagerEvent
+ service_loop wakeup []
+
+service_loop :: HANDLE -- read end of pipe
+ -> [DelayReq] -- current delay requests
+ -> IO ()
+
+service_loop wakeup old_delays = do
+ -- pick up new delay requests
+ new_delays <- atomicSwapIORef pendingDelays []
+ let delays = foldr insertDelay old_delays new_delays
+
+ now <- getMonotonicUSec
+ (delays', timeout) <- getDelay now delays
+
+ r <- c_WaitForSingleObject wakeup timeout
+ case r of
+ 0xffffffff -> do throwGetLastError "service_loop"
+ 0 -> do
+ r2 <- c_readIOManagerEvent
+ exit <-
+ case r2 of
+ _ | r2 == io_MANAGER_WAKEUP -> return False
+ _ | r2 == io_MANAGER_DIE -> return True
+ 0 -> return False -- spurious wakeup
+ _ -> do start_console_handler (r2 `shiftR` 1); return False
+ when (not exit) $ service_cont wakeup delays'
+
+ _other -> service_cont wakeup delays' -- probably timeout
+
+service_cont :: HANDLE -> [DelayReq] -> IO ()
+service_cont wakeup delays = do
+ _ <- atomicSwapIORef prodding False
+ service_loop wakeup delays
+
+wakeupIOManager :: IO ()
+wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
+
+-- Walk the queue of pending delays, waking up any that have passed
+-- and return the smallest delay to wait for. The queue of pending
+-- delays is kept ordered.
+getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
+getDelay _ [] = return ([], iNFINITE)
+getDelay now all@(d : rest)
+ = case d of
+ Delay time m | now >= time -> do
+ putMVar m ()
+ getDelay now rest
+ DelaySTM time t | now >= time -> do
+ atomically $ writeTVar t True
+ getDelay now rest
+ _otherwise ->
+ -- delay is in millisecs for WaitForSingleObject
+ let micro_seconds = delayTime d - now
+ milli_seconds = (micro_seconds + 999) `div` 1000
+ in return (all, fromIntegral milli_seconds)
+
+foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_getIOManagerEvent :: IO HANDLE
+
+foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_readIOManagerEvent :: IO Word32
+
+foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_sendIOManagerEvent :: Word32 -> IO ()
+
+foreign import WINDOWS_CCONV "WaitForSingleObject"
+ c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
+
diff --git a/libraries/base/GHC/Conc/POSIX/Const.hsc b/libraries/base/GHC/Conc/POSIX/Const.hsc
new file mode 100644
index 0000000000..6978d4f5ec
--- /dev/null
+++ b/libraries/base/GHC/Conc/POSIX/Const.hsc
@@ -0,0 +1,30 @@
+{-# LANGUAGE Trustworthy #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# OPTIONS_GHC -Wno-missing-signatures #-}
+{-# OPTIONS_HADDOCK not-home #-}
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : GHC.Conc.POSIX.Const
+-- Copyright : (c) The University of Glasgow, 2019
+-- License : see libraries/base/LICENSE
+--
+-- Maintainer : cvs-ghc@haskell.org
+-- Stability : internal
+-- Portability : non-portable (GHC extensions)
+--
+-- Constants shared with the rts, GHC.Conc.POSIX uses MagicHash which confuses
+-- hsc2hs so these are moved to a new module.
+--
+-----------------------------------------------------------------------------
+
+-- #not-home
+module GHC.Conc.POSIX.Const where
+
+import Data.Word
+
+#include <Rts.h>
+
+io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
+io_MANAGER_WAKEUP = #{const IO_MANAGER_WAKEUP}
+io_MANAGER_DIE = #{const IO_MANAGER_DIE}
diff --git a/libraries/base/GHC/Event/Internal/Types.hs b/libraries/base/GHC/Event/Internal/Types.hs
new file mode 100644
index 0000000000..e02ff36b61
--- /dev/null
+++ b/libraries/base/GHC/Event/Internal/Types.hs
@@ -0,0 +1,160 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+-------------------------------------------------------------------------------
+-- |
+-- Module : GHC.IO.Types
+-- Copyright : (c) Tamar Christina 2018
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable
+--
+-- Abstraction over C Handle types for GHC, Unix wants FD (CInt) while Windows
+-- Wants Handle (CIntPtr), so we abstract over them here.
+--
+-------------------------------------------------------------------------------
+
+module GHC.Event.Internal.Types
+ (
+ -- * Event type
+ Event
+ , evtRead
+ , evtWrite
+ , evtClose
+ , evtNothing
+ , eventIs
+ -- * Lifetimes
+ , Lifetime(..)
+ , EventLifetime
+ , eventLifetime
+ , elLifetime
+ , elEvent
+ -- * Timeout type
+ , Timeout(..)
+ ) where
+
+import Data.OldList (foldl', filter, intercalate, null)
+
+import Data.Bits ((.|.), (.&.))
+import Data.Semigroup.Internal (stimesMonoid)
+
+import GHC.Base
+import GHC.Show (Show(..))
+import GHC.Word (Word64)
+
+-- | An I\/O event.
+newtype Event = Event Int
+ deriving Eq -- ^ @since 4.4.0.0
+
+evtNothing :: Event
+evtNothing = Event 0
+{-# INLINE evtNothing #-}
+
+-- | Data is available to be read.
+evtRead :: Event
+evtRead = Event 1
+{-# INLINE evtRead #-}
+
+-- | The file descriptor is ready to accept a write.
+evtWrite :: Event
+evtWrite = Event 2
+{-# INLINE evtWrite #-}
+
+-- | Another thread closed the file descriptor.
+evtClose :: Event
+evtClose = Event 4
+{-# INLINE evtClose #-}
+
+eventIs :: Event -> Event -> Bool
+eventIs (Event a) (Event b) = a .&. b /= 0
+
+-- | @since 4.4.0.0
+instance Show Event where
+ show e = '[' : (intercalate "," . filter (not . null) $
+ [evtRead `so` "evtRead",
+ evtWrite `so` "evtWrite",
+ evtClose `so` "evtClose"]) ++ "]"
+ where ev `so` disp | e `eventIs` ev = disp
+ | otherwise = ""
+
+-- | @since 4.10.0.0
+instance Semigroup Event where
+ (<>) = evtCombine
+ stimes = stimesMonoid
+
+-- | @since 4.4.0.0
+instance Monoid Event where
+ mempty = evtNothing
+ mconcat = evtConcat
+
+evtCombine :: Event -> Event -> Event
+evtCombine (Event a) (Event b) = Event (a .|. b)
+{-# INLINE evtCombine #-}
+
+evtConcat :: [Event] -> Event
+evtConcat = foldl' evtCombine evtNothing
+{-# INLINE evtConcat #-}
+
+-- | The lifetime of an event registration.
+--
+-- @since 4.8.1.0
+data Lifetime = OneShot -- ^ the registration will be active for only one
+ -- event
+ | MultiShot -- ^ the registration will trigger multiple times
+ deriving ( Show -- ^ @since 4.8.1.0
+ , Eq -- ^ @since 4.8.1.0
+ )
+
+-- | The longer of two lifetimes.
+elSupremum :: Lifetime -> Lifetime -> Lifetime
+elSupremum OneShot OneShot = OneShot
+elSupremum _ _ = MultiShot
+{-# INLINE elSupremum #-}
+
+-- | @since 4.10.0.0
+instance Semigroup Lifetime where
+ (<>) = elSupremum
+ stimes = stimesMonoid
+
+-- | @mappend@ takes the longer of two lifetimes.
+--
+-- @since 4.8.0.0
+instance Monoid Lifetime where
+ mempty = OneShot
+
+-- | A pair of an event and lifetime
+--
+-- Here we encode the event in the bottom three bits and the lifetime
+-- in the fourth bit.
+newtype EventLifetime = EL Int
+ deriving ( Show -- ^ @since 4.8.0.0
+ , Eq -- ^ @since 4.8.0.0
+ )
+
+-- | @since 4.11.0.0
+instance Semigroup EventLifetime where
+ EL a <> EL b = EL (a .|. b)
+
+-- | @since 4.8.0.0
+instance Monoid EventLifetime where
+ mempty = EL 0
+
+eventLifetime :: Event -> Lifetime -> EventLifetime
+eventLifetime (Event e) l = EL (e .|. lifetimeBit l)
+ where
+ lifetimeBit OneShot = 0
+ lifetimeBit MultiShot = 8
+{-# INLINE eventLifetime #-}
+
+elLifetime :: EventLifetime -> Lifetime
+elLifetime (EL x) = if x .&. 8 == 0 then OneShot else MultiShot
+{-# INLINE elLifetime #-}
+
+elEvent :: EventLifetime -> Event
+elEvent (EL x) = Event (x .&. 0x7)
+{-# INLINE elEvent #-}
+
+-- | A type alias for timeouts, specified in nanoseconds.
+data Timeout = Timeout {-# UNPACK #-} !Word64
+ | Forever
+ deriving Show -- ^ @since 4.4.0.0
diff --git a/libraries/base/GHC/Event/TimeOut.hs b/libraries/base/GHC/Event/TimeOut.hs
new file mode 100644
index 0000000000..7be0a4ebc4
--- /dev/null
+++ b/libraries/base/GHC/Event/TimeOut.hs
@@ -0,0 +1,40 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+-------------------------------------------------------------------------------
+-- |
+-- Module : GHC.Event.TimeOut
+-- Copyright : (c) Tamar Christina 2018
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable
+--
+-- Common Timer definitions shared between WinIO and RIO.
+--
+-------------------------------------------------------------------------------
+
+module GHC.Event.TimeOut where
+
+import GHC.IO
+import GHC.Base
+
+import qualified GHC.Event.PSQ as Q
+import GHC.Event.Unique (Unique)
+
+-- | A priority search queue, with timeouts as priorities.
+type TimeoutQueue = Q.PSQ TimeoutCallback
+
+-- |
+-- Warning: since the 'TimeoutCallback' is called from the I/O manager, it must
+-- not throw an exception or block for a long period of time. In particular,
+-- be wary of 'Control.Exception.throwTo' and 'Control.Concurrent.killThread':
+-- if the target thread is making a foreign call, these functions will block
+-- until the call completes.
+type TimeoutCallback = IO ()
+
+-- | An edit to apply to a 'TimeoutQueue'.
+type TimeoutEdit = TimeoutQueue -> TimeoutQueue
+
+-- | A timeout registration cookie.
+newtype TimeoutKey = TK Unique
+ deriving (Eq, Ord)
diff --git a/libraries/base/GHC/Event/Windows.hsc b/libraries/base/GHC/Event/Windows.hsc
new file mode 100644
index 0000000000..4688075daf
--- /dev/null
+++ b/libraries/base/GHC/Event/Windows.hsc
@@ -0,0 +1,1188 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE DoAndIfThenElse #-}
+{-# LANGUAGE ForeignFunctionInterface #-}
+{-# LANGUAGE PatternGuards #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# LANGUAGE CPP #-}
+
+-------------------------------------------------------------------------------
+-- |
+-- Module : GHC.Event.Windows
+-- Copyright : (c) Tamar Christina 2018
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable
+--
+-- WinIO Windows event manager.
+--
+-------------------------------------------------------------------------------
+
+module GHC.Event.Windows (
+ -- * Manager
+ Manager,
+ getSystemManager,
+ interruptSystemManager,
+ wakeupIOManager,
+ processRemoteCompletion,
+
+ -- * Overlapped I/O
+ associateHandle,
+ associateHandle',
+ withOverlapped,
+ withOverlappedEx,
+ StartCallback,
+ StartIOCallback,
+ CbResult(..),
+ CompletionCallback,
+ LPOVERLAPPED,
+
+ -- * Timeouts
+ TimeoutCallback,
+ TimeoutKey,
+ Seconds,
+ registerTimeout,
+ updateTimeout,
+ unregisterTimeout,
+
+ -- * Utilities
+ withException,
+ ioSuccess,
+ ioFailed,
+ getLastError,
+
+ -- * I/O Result type
+ IOResult(..),
+
+ -- * I/O Event notifications
+ HandleData (..), -- seal for release
+ HandleKey (handleValue),
+ registerHandle,
+ unregisterHandle,
+
+ -- * Console events
+ module GHC.Event.Windows.ConsoleEvent
+) where
+
+-- define DEBUG 1
+
+##include "windows_cconv.h"
+#include <windows.h>
+#include <ntstatus.h>
+#include <Rts.h>
+#include "winio_structs.h"
+
+import GHC.Event.Windows.Clock (Clock, Seconds, getClock, getTime)
+import GHC.Event.Windows.FFI (LPOVERLAPPED, OVERLAPPED_ENTRY(..))
+import GHC.Event.Windows.ManagedThreadPool
+import GHC.Event.Internal.Types
+import qualified GHC.Event.Windows.FFI as FFI
+import qualified GHC.Event.PSQ as Q
+import qualified GHC.Event.IntTable as IT
+import qualified GHC.Event.Internal as I
+
+import {-# SOURCE #-} Control.Concurrent
+import Control.Concurrent.MVar
+import Control.Exception as E
+import Data.IORef
+import Data.Foldable (mapM_, length, forM_)
+import Data.Maybe
+import Data.Word
+import Data.Semigroup.Internal (stimesMonoid)
+import Data.OldList (deleteBy)
+import Foreign
+import Foreign.ForeignPtr.Unsafe
+import qualified GHC.Event.Array as A
+import GHC.Base
+import GHC.Conc.Sync (forkIO, showThreadId,
+ ThreadId(..), ThreadStatus(..),
+ threadStatus, sharedCAF)
+import GHC.Event.Unique
+import GHC.Event.TimeOut
+import GHC.Event.Windows.ConsoleEvent
+import GHC.IOPort
+import GHC.Num
+import GHC.Real
+import GHC.Windows
+import GHC.List (null)
+import GHC.Ptr
+import System.IO.Unsafe (unsafePerformIO)
+import Text.Show
+import GHC.RTS.Flags
+
+-- if defined(DEBUG)
+#if 1
+import Foreign.C
+import System.Posix.Internals (c_write)
+import GHC.Conc.Sync (myThreadId)
+#endif
+
+import qualified GHC.Windows as Win32
+
+-- Note [WINIO Manager design]
+-- This file contains the Windows I//O manager. Windows's IO subsystem is by
+-- design fully asynchronous, however there are multiple ways and interfaces
+-- to the async methods.
+--
+-- The chosen Async interface for this implementation is using Completion Ports
+-- See also Note [Completion Ports]. The I/O manager uses a new interface added
+-- in Windows Vista called `GetQueuedCompletionStatusEx` which allows us to
+-- service multiple requests in one go.
+--
+-- See https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/overview-of-the-windows-i-o-model
+-- and https://www.microsoftpressstore.com/articles/article.aspx?p=2201309&seqNum=3
+--
+-- In order to understand this file, here is what you should know:
+-- We're using relatively new APIs that allow us to service multiple requests at
+-- the same time using one OS thread. This happens using so called Completion
+-- ports. All I/O actions get associated with one and the same completion port.
+--
+-- The I/O manager itself has two mode of operation:
+-- 1) Threaded: We have N dedicated OS threads in the Haskell world that service
+-- completion requests. Everything is Handled 100% in view of the runtime.
+-- Whenever the OS has completions that need to be serviced it wakes up one
+-- one of the OS threads that are blocked in GetQueuedCompletionStatusEx and
+-- lets it proceed with the list of completions that are finished. If more
+-- completions finish before the first list is done being processed then
+-- another thread is woken up. These threads are associated with the I/O
+-- manager through the completion port. If it blocks for any reason the
+-- I/O manager will wake up another thread from the pool to finish processing
+-- the remaining entries. This worker threads must be able to handle the
+-- fact that something else has finished the remainder of their queue or must
+-- have a guarantee to never block. In this implementation we strive to
+-- never block. This is achieved by not having the worker threads call out
+-- to any user code, and to have the IOPort synchronization primitive never
+-- block. This means if the port is full the message is lost, however we
+-- have an invariant that the port can never be full and have a waiting
+-- receiver. As such, dropping the message does not change anything as there
+-- will never be anyone to receive it. e.g. it is an impossible situation to
+-- land in.
+-- 2) Non-threaded: We don't have any dedicated Haskell threads at servicing
+-- I/O Requests. Instead we have an OS thread inside the RTS that gets
+-- notified of new requests and does the servicing. When a request completes
+-- a Haskell thread is scheduled to run to finish off the processing of any
+-- completed requests. See Note [Non-Threaded WINIO design].
+--
+-- These two modes of operations share the majority of the code and so they both
+-- support the same operations and fixing one will fix the other. (See the step
+-- function.)
+-- Unlike MIO, we don't threat network I/O any differently than file I/O. Hence
+-- any network specific code is now only in the network package.
+--
+-- Note [Threaded WINIO design]
+-- The threaded WiNIO is designed around a simple blocking call that's called in
+-- a service loop in a dedicated thread: `GetQueuedCompletionStatusEx`.
+-- as such the loop is reasonably simple. We're either servicing finished
+-- requests or blocking in `getQueuedCompletionStatusEx` waiting for new
+-- requests to arrive.
+--
+-- Each time a Handle is made three important things happen that affect the I/O
+-- manager design:
+-- 1) Files are opened with the `FILE_FLAG_OVERLAPPED` flag, which instructs the
+-- OS that we will be doing purely asynchronous requests. See
+-- `GHC.IO.Windows.Handle.openFile`. They are also opened with
+-- `FILE_FLAG_SEQUENTIAL_SCAN` to indicate to the OS that we want to optimize
+-- the access of the file for sequential access. (e.g. equivalent to MADVISE)
+-- 2) The created handle is associated with the I/O manager's completion port.
+-- This allows the I/O manager to be able to service I/O events from this
+-- handle. See `associateHandle`.
+-- 3) File handles are additionally modified with two optimization flags:
+--
+-- FILE_SKIP_COMPLETION_PORT_ON_SUCCESS: If the request can be serviced
+-- immediately, then do not queue the IRP (IO Request Packet) into the I/O
+-- manager waiting for us to service it later. Instead service it
+-- immediately in the same call. This is beneficial for two reasons:
+-- 1) We don't have to block in the Haskell RTS.
+-- 2) We save a bunch of work in the OS's I/O subsystem.
+-- The downside is though that we have to do a bunch of work to handle these
+-- cases. This is abstracted away from the user by the `withOverlapped`
+-- function.
+-- This together with the buffering strategy mentioned above means we
+-- actually skip the I/O manager on quite a lot of I/O requests due to the
+-- value being in the cache. Because of the Lazy I/O in Haskell, the time
+-- to read and decode the buffer of bytes is usually longer than the OS needs
+-- to read the next chunk, so we hit the FAST_IO IRP quite often.
+--
+-- FILE_SKIP_SET_EVENT_ON_HANDLE: Since we will not be using an event object
+-- to monitor asynchronous completions, don't bother updating or checking for
+-- one. This saves some precious cycles, especially on operations with very
+-- high number of I/O operations (e.g. servers.)
+--
+-- So what does servicing a request actually mean. As mentioned before the
+-- I/O manager will be blocked or servicing a request. In reality it doesn't
+-- always block till an I/O request has completed. In cases where we have event
+-- timers, we block till the next timer's timeout. This allows us to also
+-- service timers in the same loop. The side effect of this is that we will
+-- exit the I/O wait sometimes without any completions. Not really a problem
+-- but it's an important design decision.
+--
+-- Every time we wait, we give a pre-allocated buffer of `n`
+-- `OVERLAPPED_ENTRIES` to the OS. This means that in a single call we can
+-- service up to `n` I/O requests at a time. The size of `n` is not fixed,
+-- anytime we dequeue `n` I/O requests in a single operation we double the
+-- buffer size, allowing the I/O manager to be able to scale up depending
+-- on the workload. This buffer is kept alive throughout the lifetime of the
+-- program and is never freed until the I/O manager is shutting down.
+--
+-- One very important property of the I/O subsystem is that each I/O request
+-- now requires an `OVERLAPPED` structure be given to the I/O manager. See
+-- `withOverlappedEx`. This buffer is used by the OS to fill in various state
+-- information by the OS. Throughout the duration of I/O call, this buffer MUST
+-- remain live. The address is pinned by the kernel, which means that the
+-- pointer must remain accessible until `GetQueuedCompletionStatusEx` returns
+-- the completion associated with the handle and not just until the call to what
+-- ever I/O operation was used to initialize the I/O request returns.
+-- The only exception to this is when the request has hit the FAST_IO path, in
+-- which case it has skipped the I/O queue and so can be freed immediately after
+-- reading the results from it.
+--
+-- To prevent having to lookup the Haskell payload in a shared state after the
+-- request completes we attach it as part of the I/O request by extending the
+-- `OVERLAPPED` structure. Instead of passing an `OVERLAPPED` structure to the
+-- Windows API calls we instead pass a `HASKELL_OVERLAPPED` struct which has
+-- as the first element an `OVERLAPPED structure. This means when a request is
+-- done all we need to do is cast the pointer back to `HASKELL_OVERLAPPED` and
+-- read the accompanying data. This also means we don't have a global lock and
+-- so can scale much easier.
+
+-- ---------------------------------------------------------------------------
+-- I/O manager resume/suspend code
+
+{-# NOINLINE ioManagerThread #-}
+ioManagerThread :: MVar (Maybe ThreadId)
+ioManagerThread = unsafePerformIO $ do
+ m <- newMVar Nothing
+ sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore
+
+foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore"
+ getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a)
+
+-- ---------------------------------------------------------------------------
+-- Non-threaded I/O manager callback hooks. See `ASyncWinIO.c`
+
+foreign import ccall safe "registerNewIOCPHandle"
+ registerNewIOCPHandle :: FFI.IOCP -> IO ()
+
+foreign import ccall safe "registerAlertableWait"
+ registerAlertableWait :: FFI.IOCP -> DWORD -> Word64 -> IO ()
+
+foreign import ccall safe "getOverlappedEntries"
+ getOverlappedEntries :: Ptr DWORD -> IO (Ptr OVERLAPPED_ENTRY)
+
+foreign import ccall safe "servicedIOEntries"
+ servicedIOEntries :: Word64 -> IO ()
+
+foreign import ccall safe "completeSynchronousRequest"
+ completeSynchronousRequest :: IO ()
+
+------------------------------------------------------------------------
+-- Manager structures
+
+-- | Callback type that will be called when an I/O operation completes.
+type IOCallback = CompletionCallback ()
+
+-- | Wrap the IOCallback type into a FunPtr.
+foreign import ccall "wrapper"
+ wrapIOCallback :: IOCallback -> IO (FunPtr IOCallback)
+
+-- | Unwrap a FunPtr IOCallback to a normal Haskell function.
+foreign import ccall "dynamic"
+ mkIOCallback :: FunPtr IOCallback -> IOCallback
+
+-- | Structure that the I/O managed uses to to associate callbacks with
+-- it's additional payload such as it's OVERLAPPED structure and Win32 handle
+-- etc. Must be kept in sync with that in `winio_structs.h` or horrible things
+-- happen.
+data CompletionData = CompletionData { cdHandle :: !HANDLE
+ , cdCallback :: !IOCallback
+ }
+
+instance Storable CompletionData where
+ sizeOf _ = #{size CompletionData}
+ alignment _ = #{alignment CompletionData}
+
+ peek ptr = do
+ cdHandle <- #{peek CompletionData, cdHandle} ptr
+ cdCallback <- mkIOCallback `fmap` #{peek CompletionData, cdCallback} ptr
+ let !cd = CompletionData{..}
+ return cd
+
+ poke ptr CompletionData{..} = do
+ #{poke CompletionData, cdHandle} ptr cdHandle
+ cb <- wrapIOCallback cdCallback
+ #{poke CompletionData, cdCallback} ptr cb
+
+-- | Pointer offset in bytes to the location of hoData in HASKELL_OVERLAPPPED
+cdOffset :: Int
+cdOffset = #{const __builtin_offsetof (HASKELL_OVERLAPPED, hoData)}
+
+-- | Terminator symbol for IOCP request
+nullReq :: Ptr (Ptr a)
+nullReq = castPtr $ unsafePerformIO $ new $ (nullPtr :: Ptr ())
+
+-- I don't expect a lot of events, so a simple linked lists should be enough.
+type EventElements = [(Event, HandleData)]
+data EventData = EventData { evtTopLevel :: !Event, evtElems :: !EventElements }
+
+instance Monoid EventData where
+ mempty = EventData evtNothing []
+ mappend = (<>)
+
+instance Semigroup EventData where
+ (<>) = \a b -> EventData (evtTopLevel a <> evtTopLevel b)
+ (evtElems a ++ evtElems b)
+ stimes = stimesMonoid
+
+data IOResult a
+ = IOSuccess { ioValue :: a }
+ | IOFailed { ioErrCode :: Maybe Int }
+
+-- | The state object for the I/O manager. This structure is available for both
+-- the threaded and the non-threaded RTS.
+data Manager = Manager
+ { mgrIOCP :: {-# UNPACK #-} !FFI.IOCP
+ , mgrClock :: !Clock
+ , mgrUniqueSource :: {-# UNPACK #-} !UniqueSource
+ , mgrTimeouts :: {-# UNPACK #-} !(IORef TimeoutQueue)
+ , mgrEvntHandlers :: {-# UNPACK #-}
+ !(MVar (IT.IntTable EventData))
+ , mgrOverlappedEntries
+ :: {-#UNPACK #-} !(A.Array OVERLAPPED_ENTRY)
+ , mgrThreadPool :: Maybe ThreadPool
+ }
+
+-- | Create a new I/O manager. In the Threaded I/O manager this call doesn't
+-- have any side effects, but in the Non-Threaded I/O manager the newly
+-- created IOCP handle will be registered with the RTS. Users should never
+-- call this.
+--
+-- NOTE: This needs to finish without making any calls to anything requiring the
+-- I/O manager otherwise we'll get into some weird synchronization issues.
+-- Essentially this means avoid using long running operations here.
+newManager :: IO Manager
+newManager = do
+ debugIO "Starting io-manager..."
+ mgrIOCP <- FFI.newIOCP
+ when (not threaded) $
+ registerNewIOCPHandle mgrIOCP
+ debugIO $ "iocp: " ++ show mgrIOCP
+ mgrClock <- getClock
+ mgrUniqueSource <- newSource
+ mgrTimeouts <- newIORef Q.empty
+ mgrOverlappedEntries <- A.new 64
+ mgrEvntHandlers <- newMVar =<< IT.new callbackArraySize
+ let mgrThreadPool = Nothing
+
+ let !mgr = Manager{..}
+ return mgr
+
+{-# INLINE startIOManagerThread #-}
+-- | Starts a new I/O manager thread.
+-- For the threaded runtime it creates a pool of OS threads which stays alive
+-- until they are instructed to die. For the non-threaded runtime we have a
+-- single worker thread in the C runtime.
+startIOManagerThread :: IO () -> IO ()
+startIOManagerThread loop = do
+ modifyMVar_ ioManagerThread $ \old -> do
+ let create = do debugIO "spawning worker threads.."
+ t <- if threaded
+ then forkOS loop
+ else forkIO loop
+ setStatus WinIORunning
+ debugIO $ "created io-manager threads."
+ return (Just t)
+ case old of
+ Nothing -> create
+ Just t -> do
+ s <- threadStatus t
+ case s of
+ ThreadFinished -> create
+ ThreadDied -> create
+ _other -> do status <- getStatus
+ case status of
+ WinIOBlocked -> do
+ c_sendIOManagerEvent io_MANAGER_WAKEUP
+ debugIO $ "woke up manager on thread: "
+ ++ showThreadId t
+ WinIOScanning -> do
+ debugIO $ "interrupted IOCP timeout wait on thread: "
+ ++ showThreadId t
+ WinIOWaiting -> do
+ debugIO $ "interrupted IOCP long wait on thread: "
+ ++ showThreadId t
+ _ -> return ()
+ when (status /= WinIORunning)
+ interruptSystemManager
+ return (Just t)
+
+-- | The various states the I/O manager can be in. Used mostly for internal
+-- bookkeeping and to make certain operations idempotent.
+data WinIOStatus
+ = WinIORunning -- ^ I/O manager is running and doing something.
+ | WinIOScanning -- ^ The I/O manager has been interrupted without servicing
+ -- a request. Likely due to a timer elapsing.
+ | WinIOWaiting -- ^ I/O manager is blocked on an alert-able wait for I/O
+ -- completions.
+ | WinIOBlocked -- ^ I/O manager is not servicing any I/O requests but the
+ -- thread is still alive. This is usually the result of
+ -- a user requested event.
+ | WinIODone -- The I/O manager was requested to terminate and has done so.
+ deriving Eq
+
+
+statusWinIO :: MVar WinIOStatus
+statusWinIO = unsafePerformIO $ newMVar WinIODone
+
+setStatus :: WinIOStatus -> IO ()
+setStatus val = modifyMVar_ statusWinIO (\_ -> return val)
+
+getStatus :: IO WinIOStatus
+getStatus = readMVar statusWinIO
+
+requests :: MVar Word64
+requests = unsafePerformIO $ newMVar 0
+
+addRequest :: IO Word64
+addRequest = modifyMVar requests (\x -> return (x + 1, x + 1))
+
+removeRequest :: IO Word64
+removeRequest = modifyMVar requests (\x -> return (x - 1, x - 1))
+
+outstandingRequests :: IO Word64
+outstandingRequests = withMVar requests return
+
+getSystemManager :: IO Manager
+getSystemManager = readMVar managerRef
+
+-- | Mutable reference to the IO manager
+managerRef :: MVar Manager
+managerRef = unsafePerformIO $ newManager >>= newMVar
+{-# NOINLINE managerRef #-}
+
+-- | Interrupts an I/O manager Wait. This will force the I/O manager to process
+-- any outstanding events and timers. Also called when console events such as
+-- ctrl+c are used to break abort an I/O request.
+interruptSystemManager :: IO ()
+interruptSystemManager = do
+ mgr <- getSystemManager
+ status <- getStatus
+ when (status /= WinIORunning) $
+ do debugIO "interrupt received.."
+ FFI.postQueuedCompletionStatus (mgrIOCP mgr) 0 0 nullPtr
+ when (status == WinIODone) $
+ do debugIO $ "I/O manager is dead. You need to revive it first. "
+ ++ "Try wakeupIOManager instead."
+
+
+-- | The initial number of I/O requests we can service at the same time.
+-- Must be power of 2. This number is used as the starting point to scale
+-- the number of concurrent requests. It will be doubled everytime we are
+-- saturated.
+callbackArraySize :: Int
+callbackArraySize = 32
+
+-----------------------------------------------------------------------
+-- Time utilities
+
+secondsToNanoSeconds :: Seconds -> Q.Prio
+secondsToNanoSeconds s = ceiling $ s * 1000000000
+
+nanoSecondsToSeconds :: Q.Prio -> Seconds
+nanoSecondsToSeconds n = fromIntegral n / 1000000000.0
+
+------------------------------------------------------------------------
+-- Overlapped I/O
+
+-- | Callback that starts the overlapped I/O operation.
+-- It must return successfully if and only if an I/O completion has been
+-- queued. Otherwise, it must throw an exception, which 'withOverlapped'
+-- will rethrow.
+type StartCallback a = LPOVERLAPPED -> IO a
+
+-- | Specialized callback type for I/O Completion Ports calls using
+-- withOverlapped.
+type StartIOCallback a = StartCallback (CbResult a)
+
+-- | CallBack result type to disambiguate between the different states
+-- an I/O Completion call could be in.
+data CbResult a
+ = CbDone (Maybe DWORD) -- ^ Request was handled immediately, no queue.
+ | CbPending -- ^ Queued and handled by I/O manager
+ | CbIncomplete -- ^ I/O request is incomplete but not enqueued, handle
+ -- it synchronously.
+ | CbError a -- ^ I/O request abort, return failure immediately
+ | CbNone Bool -- ^ The caller did not do any checking, the I/O
+ -- manager will perform additional checks.
+
+-- | Called when the completion is delivered.
+type CompletionCallback a = ErrCode -- ^ 0 indicates success
+ -> DWORD -- ^ Number of bytes transferred
+ -> IO a
+
+-- | Associate a 'HANDLE' with the current I/O manager's completion port.
+-- This must be done before using the handle with 'withOverlapped'.
+associateHandle' :: HANDLE -> IO ()
+associateHandle' hwnd
+ = do mngr <- getSystemManager
+ associateHandle mngr hwnd
+
+-- | Associate a 'HANDLE' with the I/O manager's completion port. This must be
+-- done before using the handle with 'withOverlapped'.
+associateHandle :: Manager -> HANDLE -> IO ()
+associateHandle Manager{..} h =
+ -- Use as completion key the file handle itself, so we can track completion
+ FFI.associateHandleWithIOCP mgrIOCP h (fromIntegral $ ptrToWordPtr h)
+
+-- | Start an overlapped I/O operation, and wait for its completion. If
+-- 'withOverlapped' is interrupted by an asynchronous exception, the operation
+-- will be canceled using @CancelIoEx@.
+--
+-- 'withOverlapped' waits for a completion to arrive before returning or
+-- throwing an exception. This means you can use functions like
+-- 'Foreign.Marshal.Alloc.alloca' to allocate buffers for the operation.
+withOverlappedEx :: Manager
+ -> String
+ -> HANDLE
+ -> Word64 -- ^ Value to use for the @OVERLAPPED@
+ -- structure's Offset/OffsetHigh members.
+ -> StartIOCallback Int
+ -> CompletionCallback (IOResult a)
+ -> IO (IOResult a)
+withOverlappedEx mgr fname h offset startCB completionCB = do
+ signal <- newEmptyIOPort :: IO (IOPort (IOResult a))
+ let dbg s = s ++ " (" ++ show h ++ ":" ++ show offset ++ ")"
+ let signalReturn a = failIfFalse_ (dbg "signalReturn") $
+ writeIOPort signal (IOSuccess a)
+ signalThrow ex = failIfFalse_ (dbg "signalThrow") $
+ writeIOPort signal (IOFailed ex)
+ mask_ $ do
+ let completionCB' e b = completionCB e b >>= \result ->
+ case result of
+ IOSuccess val -> signalReturn val
+ IOFailed err -> signalThrow err
+ hs_lpol <- FFI.allocOverlapped offset
+ -- Create the completion record and store it.
+ -- We only need the record when we enqueue a request, however if we
+ -- delay creating it then we will run into a race condition where the
+ -- driver may have finished servicing the request before we were ready
+ -- and so the request won't have the book keeping information to know
+ -- what to do. So because of that we always create the payload, If we
+ -- need it ok, if we don't that's no problem. This approach prevents
+ -- expensive lookups in hash-tables.
+ --
+ -- Todo: Use a memory pool for this so we don't have to hit malloc every
+ -- time. This would allow us to scale better.
+ cdData <- new (CompletionData h completionCB')
+ let ptr_lpol = hs_lpol `plusPtr` cdOffset
+ poke ptr_lpol cdData
+ let lpol = castPtr hs_lpol
+ debugIO $ "hs_lpol:" ++ show hs_lpol
+ ++ " cdData:" ++ show cdData
+ ++ " ptr_lpol:" ++ show ptr_lpol
+
+ execute <- startCB lpol `onException`
+ (CbError `fmap` Win32.getLastError) >>= \result -> do
+ -- Check to see if the operation was completed on a
+ -- non-overlapping handle or was completed immediately.
+ -- e.g. stdio redirection or data in cache, FAST I/O.
+ success <- FFI.overlappedIOStatus lpol
+ err <- fmap fromIntegral getLastError
+ -- Determine if the caller has done any checking. If not then check
+ -- to see if the request was completed synchronously. We have to
+ -- in order to prevent deadlocks since if it has completed
+ -- synchronously we've requested to not have the completion queued.
+ let result' =
+ case result of
+ CbNone ret | success == #{const STATUS_SUCCESS} -> CbDone Nothing
+ | success == #{const STATUS_END_OF_FILE} -> CbDone Nothing
+ | success == #{const STATUS_PENDING} -> CbPending
+ -- Buffer was too small.. not sure what to do, so I'll just
+ -- complete the read request
+ | err == #{const ERROR_MORE_DATA} -> CbDone Nothing
+ | err == #{const ERROR_SUCCESS} -> CbDone Nothing
+ | err == #{const ERROR_IO_PENDING} -> CbPending
+ | err == #{const ERROR_IO_INCOMPLETE} -> CbIncomplete
+ | err == #{const ERROR_HANDLE_EOF} -> CbDone Nothing
+ | not ret -> CbError err
+ | otherwise -> CbPending
+ _ -> result
+ case result' of
+ CbNone _ -> error "shouldn't happen."
+ CbIncomplete -> do
+ debugIO $ "handling incomplete request synchronously " ++ show (h, lpol)
+ res <- spinWaitComplete h lpol
+ debugIO $ "done blocking request " ++ show (h, lpol)
+ return res
+ CbPending -> do
+ -- Before we enqueue check to see if operation finished in the
+ -- mean time, since caller may not have done this.
+ -- Normally we'd have to clear lpol with 0 before this call,
+ -- however the statuses we're interested in would not get to here
+ -- so we can save the memset call.
+ finished <- FFI.getOverlappedResult h lpol False
+ debugIO $ "== " ++ show (finished)
+ status <- FFI.overlappedIOStatus lpol
+ debugIO $ "== >< " ++ show (status)
+ lasterr <- fmap fromIntegral getLastError :: IO Int
+ -- This status indicated that we have finished early and so we
+ -- won't have a request enqueued. Handle it inline.
+ let done_early = status == #{const STATUS_SUCCESS}
+ || status == #{const STATUS_END_OF_FILE}
+ || lasterr == #{const ERROR_HANDLE_EOF}
+ || lasterr == #{const ERROR_SUCCESS}
+ -- This status indicates that the request hasn't finished early,
+ -- but it will finish shortly. The I/O manager will not be
+ -- enqueuing this either. Also needs to be handled inline.
+ let will_finish_sync = lasterr == #{const ERROR_IO_INCOMPLETE}
+
+ debugIO $ "== >*< " ++ show (finished, done_early, will_finish_sync, h, lpol, lasterr)
+ case (finished, done_early, will_finish_sync) of
+ (Nothing, False, False) -> do
+ reqs <- addRequest
+ debugIO $ "+1.. " ++ show reqs ++ " requests queued. | " ++ show lpol
+ wakeupIOManager
+ return result'
+ (Nothing, False, True) -> do
+ debugIO $ "handling incomplete request synchronously " ++ show (h, lpol)
+ res <- spinWaitComplete h lpol
+ debugIO $ "done blocking request " ++ show (h, lpol)
+ return res
+ _ -> do
+ debugIO "request handled immediately (o/b), not queued."
+ return $ CbDone finished
+ CbError err' -> signalThrow (Just err') >> return result'
+ CbDone _ -> do
+ debugIO "request handled immediately (o), not queued." >> return result'
+
+ let cancel e = do
+ debugIO $ "## Exception occurred. Cancelling request... "
+ debugIO $ show (e :: SomeException)
+ _ <- uninterruptibleMask_ $ FFI.cancelIoEx' h lpol
+ -- we need to wait for the cancellation before removing
+ -- the pointer.
+ debugIO $ "## Waiting for cancellation record... "
+ _ <- FFI.getOverlappedResult h lpol True
+ let oldDataPtr = exchangePtr ptr_lpol nullReq
+ -- Check if we have to free and cleanup pointer
+ when (oldDataPtr == cdData) $
+ do free oldDataPtr
+ free hs_lpol
+ reqs <- removeRequest
+ debugIO $ "-1.. " ++ show reqs ++ " requests queued after error."
+ status <- fmap fromIntegral getLastError
+ completionCB' status 0
+ when (not threaded) $
+ do num_remaining <- outstandingRequests
+ servicedIOEntries num_remaining
+ return $ IOFailed Nothing
+ let runner = do debugIO $ (dbg ":: waiting ") ++ " | " ++ show lpol
+ res <- readIOPort signal `catch` cancel
+ debugIO $ dbg ":: signaled "
+ case res of
+ IOFailed err -> FFI.throwWinErr fname (maybe 0 fromIntegral err)
+ _ -> return res
+
+ -- Sometimes we shouldn't bother with the I/O manager as the call has
+ -- failed or is done.
+ case execute of
+ CbPending -> runner
+ CbDone rdata -> do
+ -- free cdData
+ debugIO $ dbg $ ":: done " ++ show lpol ++ " - " ++ show rdata
+ bytes <- if isJust rdata
+ then return rdata
+ -- Make sure it's safe to free the OVERLAPPED buffer
+ else FFI.getOverlappedResult h lpol False
+ case bytes of
+ Just res -> completionCB 0 res -- free hs_lpol >> completionCB 0 res
+ Nothing -> do err <- FFI.overlappedIOStatus lpol
+ numBytes <- FFI.overlappedIONumBytes lpol
+ -- TODO: Remap between STATUS_ and ERROR_ instead
+ -- of re-interpret here. But for now, don't care.
+ let err' = fromIntegral err
+ -- free hs_lpol
+ completionCB err' (fromIntegral numBytes)
+ CbError err -> do
+ free cdData
+ free hs_lpol
+ let err' = fromIntegral err
+ completionCB err' 0
+ _ -> do
+ free cdData
+ free hs_lpol
+ error "unexpected case in `execute'"
+ where spinWaitComplete fhndl lpol = do
+ -- Wait for the request to finish as it was running before and
+ -- The I/O manager won't enqueue it due to our optimizations to
+ -- prevent context switches in such cases.
+ res <- FFI.getOverlappedResult fhndl lpol False
+ status <- FFI.overlappedIOStatus lpol
+ case res of
+ -- Uses an inline definition of threadDelay to prevent an import
+ -- cycle.
+ Nothing | status == #{const STATUS_END_OF_FILE} -> do
+ when (not threaded) completeSynchronousRequest
+ return $ CbDone res
+ | otherwise ->
+ do m <- newEmptyIOPort
+ let secs = 100 / 1000000.0
+ reg <- registerTimeout mgr secs $
+ writeIOPort m () >> return ()
+ readIOPort m `onException` unregisterTimeout mgr reg
+ spinWaitComplete fhndl lpol
+ _ -> do
+ when (not threaded) completeSynchronousRequest
+ return $ CbDone res
+
+-- Safe version of function
+withOverlapped :: String
+ -> HANDLE
+ -> Word64 -- ^ Value to use for the @OVERLAPPED@
+ -- structure's Offset/OffsetHigh members.
+ -> StartIOCallback Int
+ -> CompletionCallback (IOResult a)
+ -> IO (IOResult a)
+withOverlapped fname h offset startCB completionCB = do
+ mngr <- getSystemManager
+ withOverlappedEx mngr fname h offset startCB completionCB
+
+------------------------------------------------------------------------
+-- I/O Utilities
+
+-- | Process an IOResult and throw an exception back to the user if the action
+-- has failed, or return the result.
+withException :: String -> IO (IOResult a) -> IO a
+withException name fn
+ = do res <- fn
+ case res of
+ IOSuccess a -> return a
+ IOFailed (Just err) -> FFI.throwWinErr name $ fromIntegral err
+ IOFailed Nothing -> FFI.throwWinErr name 0
+
+-- | Signal that the I/O action was successful.
+ioSuccess :: a -> IO (IOResult a)
+ioSuccess = return . IOSuccess
+
+-- | Signal that the I/O action has failed with the given reason.
+ioFailed :: Integral a => a -> IO (IOResult a)
+ioFailed = return . IOFailed . Just . fromIntegral
+
+------------------------------------------------------------------------
+-- Timeouts
+
+-- | Register an action to be performed in the given number of seconds. The
+-- returned 'TimeoutKey' can be used to later un-register or update the timeout.
+-- The timeout is automatically unregistered when it fires.
+--
+-- The 'TimeoutCallback' will not be called more than once.
+registerTimeout :: Manager -> Seconds -> TimeoutCallback -> IO TimeoutKey
+registerTimeout mgr@Manager{..} relTime cb = do
+ key <- newUnique mgrUniqueSource
+ if relTime <= 0 then cb
+ else do
+ now <- getTime mgrClock
+ let !expTime = secondsToNanoSeconds $ now + relTime
+ editTimeouts mgr (Q.unsafeInsertNew key expTime cb)
+ return $ TK key
+
+-- | Update an active timeout to fire in the given number of seconds (from the
+-- time 'updateTimeout' is called), instead of when it was going to fire.
+-- This has no effect if the timeout has already fired.
+updateTimeout :: Manager -> TimeoutKey -> Seconds -> IO ()
+updateTimeout mgr (TK key) relTime = do
+ now <- getTime (mgrClock mgr)
+ let !expTime = secondsToNanoSeconds $ now + relTime
+ editTimeouts mgr (Q.adjust (const expTime) key)
+
+-- | Unregister an active timeout. This is a harmless no-op if the timeout is
+-- already unregistered or has already fired.
+--
+-- Warning: the timeout callback may fire even after
+-- 'unregisterTimeout' completes.
+unregisterTimeout :: Manager -> TimeoutKey -> IO ()
+unregisterTimeout mgr (TK key) = do
+ editTimeouts mgr (Q.delete key)
+
+-- | Modify an existing timeout. This isn't thread safe and so if the time to
+-- elapse the timer was close it may fire anyway.
+editTimeouts :: Manager -> TimeoutEdit -> IO ()
+editTimeouts mgr g = do
+ atomicModifyIORef' (mgrTimeouts mgr) $ \tq -> (g tq, ())
+ wakeupIOManager
+
+------------------------------------------------------------------------
+-- I/O manager loop
+
+-- | Call all expired timeouts, and return how much time until the next
+-- | expiration.
+runExpiredTimeouts :: Manager -> IO (Maybe Seconds)
+runExpiredTimeouts Manager{..} = do
+ now <- getTime mgrClock
+ (expired, delay) <- atomicModifyIORef' mgrTimeouts (mkTimeout now)
+ -- Execute timeout callbacks.
+ mapM_ Q.value expired
+ when (not threaded && not (null expired))
+ completeSynchronousRequest
+ debugIO $ "expired calls: " ++ show (length expired)
+ return delay
+ where
+ mkTimeout :: Seconds -> TimeoutQueue ->
+ (TimeoutQueue, ([Q.Elem TimeoutCallback], Maybe Seconds))
+ mkTimeout now tq =
+ let (tq', (expired, sec)) = mkTimeout' (secondsToNanoSeconds now) tq
+ in (tq', (expired, fmap nanoSecondsToSeconds sec))
+ mkTimeout' :: Q.Prio -> TimeoutQueue ->
+ (TimeoutQueue, ([Q.Elem TimeoutCallback], Maybe Q.Prio))
+ mkTimeout' now tq =
+ -- Remove timeouts with expiration <= now.
+ let (expired, tq') = Q.atMost now tq in
+ -- See how soon the next timeout expires.
+ case Q.prio `fmap` Q.findMin tq' of
+ Nothing ->
+ (tq', (expired, Nothing))
+ Just t ->
+ -- This value will always be positive since the call
+ -- to 'atMost' above removed any timeouts <= 'now'
+ let !t' = t - now
+ in (tq', (expired, Just t'))
+
+-- | Return the delay argument to pass to GetQueuedCompletionStatus.
+fromTimeout :: Maybe Seconds -> Word32
+fromTimeout Nothing = 120000
+fromTimeout (Just sec) | sec > 120 = 120000
+ | sec > 0 = ceiling (sec * 1000)
+ | otherwise = 0
+
+-- | Perform one full evaluation step of the I/O manager's service loop.
+-- This means process timeouts and completed completions and calculate the time
+-- for the next timeout.
+--
+-- The I/O manager is then notified of how long it should block again based on
+-- the queued I/O requests and timers. If the I/O manager was given a command
+-- to block, shutdown or suspend than that request is honored at the end of the
+-- loop.
+step :: Bool -> Manager -> IO (Bool, Maybe Seconds)
+step maxDelay mgr@Manager{..} = do
+ -- Determine how long to wait the next time we block in an alertable state.
+ delay <- runExpiredTimeouts mgr
+ let !timer = if maxDelay && delay == Nothing
+ then #{const INFINITE}
+ else fromTimeout delay
+ debugIO $ "next timeout: " ++ show delay
+ debugIO $ "next timer: " ++ show timer -- todo: print as hex
+ case (maxDelay, delay) of
+ (_ , Just{} ) -> do setStatus WinIOWaiting
+ debugIO "I/O manager waiting."
+ (False, Nothing) -> do setStatus WinIOScanning
+ debugIO "I/O manager pausing."
+ (True , Nothing) -> do setStatus WinIOBlocked
+ debugIO "I/O manager deep sleep."
+ -- If threaded this call informs the threadpool that a thread is now
+ -- entering a kernel mode wait and this is free to be used. If non-threaded
+ -- then this is a no-op.
+ notifyWaiting mgrThreadPool
+ n <- if threaded
+ -- To quote Matt Godbolts:
+ -- There are some unusual edge cases you need to deal with. The
+ -- GetQueuedCompletionStatus function blocks a thread until there's
+ -- work for it to do. Based on the return value, the number of bytes
+ -- and the overlapped structure, there’s a lot of possible "reasons"
+ -- for the function to have returned. Deciphering all the possible
+ -- cases:
+ --
+ -- ------------------------------------------------------------------------
+ -- Ret value | OVERLAPPED | # of bytes | Description
+ -- ------------------------------------------------------------------------
+ -- zero | NULL | n/a | Call to GetQueuedCompletionStatus
+ -- failed, and no data was dequeued from the IO port. This usually
+ -- indicates an error in the parameters to GetQueuedCompletionStatus.
+ --
+ -- zero | non-NULL | n/a | Call to GetQueuedCompletionStatus
+ -- failed, but data was read or written. The thread must deal with the
+ -- data (possibly freeing any associated buffers), but there is an error
+ -- condition on the underlying HANDLE. Usually seen when the other end of
+ -- a network connection has been forcibly closed but there's still data in
+ -- the send or receive queue.
+ --
+ -- non-zero | NULL | n/a | This condition doesn't happen due
+ -- to IO requests, but is useful to use in combination with
+ -- PostQueuedCompletionStatus as a way of indicating to threads that they
+ -- should terminate.
+ --
+ -- non-zero | non-NULL | zero | End of file for a file HANDLE, or
+ -- the connection has been gracefully closed (for network connections).
+ -- The OVERLAPPED buffer has still been used; and must be deallocated if
+ -- necessary.
+ --
+ -- non-zero | non-NULL | non-zero | "num bytes" of data have been
+ -- transferred into the block pointed by the OVERLAPPED structure. The
+ -- direction of the transfer is dependant on the call made to the IO
+ -- port, it's up to the user to remember if it was a read or a write
+ -- (usually by stashing extra data in the OVERLAPPED structure). The
+ -- thread must deallocate the structure as necessary.
+ --
+ -- The getQueuedCompletionStatusEx call will remove entries queued by the OS
+ -- and returns the finished ones in mgrOverlappedEntries and the number of
+ -- entries removed.
+ then FFI.getQueuedCompletionStatusEx mgrIOCP mgrOverlappedEntries timer
+ else do num_req <- outstandingRequests
+ registerAlertableWait mgrIOCP timer num_req
+ return 0
+ setStatus WinIORunning
+ -- If threaded this call informs the threadpool manager that a thread is
+ -- busy. If all threads are busy and we have not reached the maximum amount
+ -- of allowed threads then the threadpool manager will spawn a new thread to
+ -- allow us to scale under load.
+ notifyRunning mgrThreadPool
+ processCompletion mgr n delay
+
+-- | Process the results at the end of an evaluation loop. This function will
+-- read all the completions, wake up all the Haskell threads, clean up the book
+-- keeping of the I/O manager and return whether there are outstanding work to
+-- be done and how long it expects to have to wait till it can take action
+-- again.
+--
+-- Note that this method can do less work than there are entries in the
+-- completion table. This is because some completion entries may have been
+-- created due to calls to interruptIOManager which will enqueue a faux
+-- completion.
+--
+-- NOTE: In Threaded mode things get a bit complicated the operation may have
+-- been completed even before we even got around to put the request in the
+-- waiting callback table. These events are handled by having a separate queue
+-- for orphaned callback instances that the calling thread is supposed to check
+-- before adding something to the work queue.
+processCompletion :: Manager -> Int -> Maybe Seconds -> IO (Bool, Maybe Seconds)
+processCompletion Manager{..} n delay = do
+ -- If some completions are done, we need to process them and call their
+ -- callbacks. We then remove the callbacks from the bookkeeping and resize
+ -- the index if required.
+ when (n > 0) $ do
+ forM_ [0..(n-1)] $ \idx -> do
+ oe <- A.unsafeRead mgrOverlappedEntries idx
+ let lpol = lpOverlapped oe
+ when (lpol /= nullPtr) $ do
+ let hs_lpol = castPtr lpol :: Ptr FFI.HASKELL_OVERLAPPED
+ let ptr_lpol = castPtr (hs_lpol `plusPtr` cdOffset) :: Ptr (Ptr CompletionData)
+ cdDataCheck <- peek ptr_lpol
+ debugIO $ " $ checking " ++ show lpol
+ ++ " -en ptr_lpol: " ++ show ptr_lpol
+ ++ " offset: " ++ show cdOffset
+ ++ " cdData: " ++ show cdDataCheck
+ ++ " at idx " ++ show idx
+ let oldDataPtr = exchangePtr ptr_lpol nullReq
+ when (oldDataPtr /= nullReq) $
+ do payload <- peek oldDataPtr
+ debugIO $ "exchanged: " ++ show oldDataPtr
+ let !(CompletionData _hwnd cb) = payload
+ -- free oldDataPtr
+ reqs <- removeRequest
+ debugIO $ "-1.. " ++ show reqs ++ " requests queued."
+ status <- FFI.overlappedIOStatus (lpOverlapped oe)
+ -- TODO: Remap between STATUS_ and ERROR_ instead
+ -- of re-interpret here. But for now, don't care.
+ let status' = fromIntegral status
+ cb status' (dwNumberOfBytesTransferred oe)
+ -- free hs_lpol
+
+ -- clear the array so we don't erroneously interpret the output, in
+ -- certain circumstances like lockFileEx the code could return 1 entry
+ -- removed but the file data not been filled in.
+ -- TODO: Maybe not needed..
+ A.clear mgrOverlappedEntries
+
+ -- Check to see if we received the maximum amount of entries we could
+ -- this likely indicates a high number of I/O requests have been queued.
+ -- In which case we should process more at a time.
+ cap <- A.capacity mgrOverlappedEntries
+ when (cap == n) $ A.ensureCapacity mgrOverlappedEntries (2*cap)
+
+ -- Keep running if we still have some work queued or
+ -- if we have a pending delay.
+ reqs <- outstandingRequests
+ debugIO $ "outstanding requests: " ++ show reqs
+ let more = reqs > 0
+ debugIO $ "has more: " ++ show more ++ " - removed: " ++ show n
+ return (more || (isJust delay && threaded), delay)
+
+-- | Entry point for the non-threaded I/O manager to be able to process
+-- completed completions. It is mostly a wrapper around processCompletion.
+processRemoteCompletion :: IO ()
+processRemoteCompletion = do
+ alloca $ \ptr_n -> do
+ debugIO "processRemoteCompletion :: start ()"
+ -- First figure out how much work we have to do.
+ entries <- getOverlappedEntries ptr_n
+ n <- fromIntegral `fmap` peek ptr_n
+ -- This call will unmarshal data from the C buffer but pointers inside of
+ -- this have not been read yet.
+ _ <- peekArray n entries
+ mngr <- getSystemManager
+ let arr = mgrOverlappedEntries mngr
+ A.unsafeSplat arr entries n
+ _ <- processCompletion mngr n Nothing
+ num_left <- outstandingRequests
+ -- This call will unblock the non-threaded I/O manager. After this it is no
+ -- longer safe to use `entries` nor `completed`.
+ servicedIOEntries num_left
+ setStatus WinIOBlocked
+ -- We may have been woken up due to a timer timeout. So check for any
+ -- expired timeouts. If we have processed any completions only check
+ -- timeouts, if we have been woken up only to process timeouts then check if
+ -- we have to change the wait interval.
+ --
+ -- When not the threaded runtime we would not have reset the timer events
+ -- below. Because of this when the request is done we have an additional
+ -- step here to reset the wait timers so the I/O manager doesn't keep
+ -- polling at the temporary high frequency we entered.
+ if (n == 0)
+ then step True mngr >> return ()
+ else runExpiredTimeouts mngr >> return ()
+ debugIO "processRemoteCompletion :: done ()"
+ return ()
+
+-- | Even loop for the Threaded I/O manager. The one for the non-threaded
+-- I/O manager is in AsyncWinIO.c in the rts.
+io_mngr_loop :: HANDLE -> Manager -> IO ()
+io_mngr_loop _event mgr = go False
+ where
+ go maxDelay =
+ do setStatus WinIORunning
+ (more, delay) <- step maxDelay mgr
+ debugIO "I/O manager stepping."
+ r2 <- c_readIOManagerEvent
+ exit <-
+ case r2 of
+ _ | r2 == io_MANAGER_WAKEUP -> return False
+ _ | r2 == io_MANAGER_DIE -> return True
+ 0 -> return False -- spurious wakeup
+ _ -> do debugIO $ "handling console event: " ++ show (r2 `shiftR` 1)
+ start_console_handler (r2 `shiftR` 1)
+ return False
+
+ -- If we have no more work to do, or something from the outside
+ -- told us to stop then we let the thread die and stop the I/O
+ -- manager. It will be woken up again when there is more to do.
+ case () of
+ _ | exit -> do setStatus WinIODone
+ debugIO "I/O manager shutting down."
+ _ | not threaded -> do setStatus WinIOBlocked
+ debugIO "I/O manager single threaded halt."
+ _ | isJust delay -> go False
+ -- We seem to have more work but no ETA for it.
+ -- So just retry until we run out of work.
+ _ | more -> go False
+ _ -> go True
+
+io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
+io_MANAGER_WAKEUP = #{const IO_MANAGER_WAKEUP}
+io_MANAGER_DIE = #{const IO_MANAGER_DIE}
+
+-- | Wake up a single thread from the I/O Manager's worker queue. This will
+-- unblock a thread blocked in `processCompletion` and allows the I/O manager to
+-- react accordingly to changes in timers or to process console signals.
+wakeupIOManager :: IO ()
+wakeupIOManager
+ = do mngr <- getSystemManager
+ status <- getStatus
+ when (status /= WinIORunning) $ do
+ event <- c_getIOManagerEvent
+ debugIO "waking up I/O manager."
+ startIOManagerThread (io_mngr_loop event mngr)
+
+foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_getIOManagerEvent :: IO HANDLE
+
+foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_readIOManagerEvent :: IO Word32
+
+foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
+ c_sendIOManagerEvent :: Word32 -> IO ()
+
+foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
+
+
+-- ---------------------------------------------------------------------------
+-- I/O manager event notifications
+
+
+data HandleData = HandleData {
+ tokenKey :: {-# UNPACK #-} !HandleKey
+ , tokenEvents :: {-# UNPACK #-} !EventLifetime
+ , _handleCallback :: !EventCallback
+ }
+
+-- | A file handle registration cookie.
+data HandleKey = HandleKey {
+ handleValue :: {-# UNPACK #-} !HANDLE
+ , handleUnique :: {-# UNPACK #-} !Unique
+ } deriving ( Eq -- ^ @since 4.4.0.0
+ , Show -- ^ @since 4.4.0.0
+ )
+
+-- | Callback invoked on I/O events.
+type EventCallback = HandleKey -> Event -> IO ()
+
+registerHandle :: Manager -> EventCallback -> HANDLE -> Event -> Lifetime
+ -> IO HandleKey
+registerHandle (Manager{..}) cb hwnd evs lt = do
+ u <- newUnique mgrUniqueSource
+ let reg = HandleKey hwnd u
+ hwnd' = fromIntegral $ ptrToIntPtr hwnd
+ el = I.eventLifetime evs lt
+ !hwdd = HandleData reg el cb
+ event = EventData evs [(evs, hwdd)]
+ _ <- withMVar mgrEvntHandlers $ \evts -> do
+ IT.insertWith mappend hwnd' event evts
+ wakeupIOManager
+ return reg
+
+unregisterHandle :: Manager -> HandleKey -> IO ()
+unregisterHandle (Manager{..}) key@HandleKey{..} = do
+ withMVar mgrEvntHandlers $ \evts -> do
+ let hwnd' = fromIntegral $ ptrToIntPtr handleValue
+ val <- IT.lookup hwnd' evts
+ case val of
+ Nothing -> return ()
+ Just (EventData evs lst) -> do
+ let cmp (_, a) (_, b) = tokenKey a == tokenKey b
+ key' = (undefined, HandleData key undefined undefined)
+ updated = deleteBy cmp key' lst
+ new_lst = EventData evs updated
+ _ <- IT.updateWith (\_ -> return new_lst) hwnd' evts
+ return ()
+
+
+-- ---------------------------------------------------------------------------
+-- debugging
+
+#if defined(DEBUG)
+c_DEBUG_DUMP :: IO Bool
+c_DEBUG_DUMP = return True -- scheduler `fmap` getDebugFlags
+#endif
+
+debugIO :: String -> IO ()
+#if defined(DEBUG)
+debugIO s
+ = do debug <- c_DEBUG_DUMP
+ if debug
+ then do tid <- myThreadId
+ let pref = if threaded then "\t" else ""
+ _ <- withCStringLen (pref ++ "winio: " ++ s ++ " (" ++
+ showThreadId tid ++ ")\n") $
+ \(p, len) -> c_write 2 (castPtr p) (fromIntegral len)
+ return ()
+ else do return ()
+#else
+debugIO _ = return ()
+#endif
+
+dbxIO :: String -> IO ()
+dbxIO s = do tid <- myThreadId
+ let pref = if threaded then "\t" else ""
+ _ <- withCStringLen (pref ++ "winio: " ++ s ++ " (" ++
+ showThreadId tid ++ ")\n") $
+ \(p, len) -> c_write 2 (castPtr p) (fromIntegral len)
+ return () \ No newline at end of file
diff --git a/libraries/base/GHC/Event/Windows/Clock.hs b/libraries/base/GHC/Event/Windows/Clock.hs
new file mode 100644
index 0000000000..34728248c0
--- /dev/null
+++ b/libraries/base/GHC/Event/Windows/Clock.hs
@@ -0,0 +1,55 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+module GHC.Event.Windows.Clock (
+ Clock,
+ Seconds,
+ getTime,
+ getClock,
+
+ -- * Specific implementations
+ queryPerformanceCounter,
+ getTickCount64
+) where
+
+import qualified GHC.Event.Windows.FFI as FFI
+
+import Data.Maybe
+import GHC.Base
+import GHC.Real
+
+-- | Monotonic clock
+newtype Clock = Clock (IO Seconds)
+
+type Seconds = Double
+
+-- | Get the current time, in seconds since some fixed time in the past.
+getTime :: Clock -> IO Seconds
+getTime (Clock io) = io
+
+-- | Figure out what time API to use, and return a 'Clock' for accessing it.
+getClock :: IO Clock
+getClock = tryInOrder
+ [ queryPerformanceCounter
+ , fmap Just getTickCount64
+ ]
+
+tryInOrder :: Monad m => [m (Maybe a)] -> m a
+tryInOrder (x:xs) = x >>= maybe (tryInOrder xs) return
+tryInOrder [] = undefined
+
+mapJust :: Monad m => m (Maybe a) -> (a -> b) -> m (Maybe b)
+mapJust m f = liftM (fmap f) m
+
+queryPerformanceCounter :: IO (Maybe Clock)
+queryPerformanceCounter =
+ FFI.queryPerformanceFrequency `mapJust` \freq ->
+ Clock $! do
+ count <- FFI.queryPerformanceCounter
+ let !secs = fromIntegral count / fromIntegral freq
+ return secs
+
+getTickCount64 :: IO Clock
+getTickCount64 =
+ return $! Clock $! do
+ msecs <- FFI.getTickCount64
+ return $! fromIntegral msecs / 1000
diff --git a/libraries/base/GHC/Event/Windows/ConsoleEvent.hsc b/libraries/base/GHC/Event/Windows/ConsoleEvent.hsc
new file mode 100644
index 0000000000..fd6f790d3b
--- /dev/null
+++ b/libraries/base/GHC/Event/Windows/ConsoleEvent.hsc
@@ -0,0 +1,72 @@
+{-# LANGUAGE Trustworthy #-}
+{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-}
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : GHC.Event.Windows.ConsoleEvent
+-- Copyright : (c) The University of Glasgow, 1994-2002
+-- License : see libraries/base/LICENSE
+--
+-- Maintainer : cvs-ghc@haskell.org
+-- Stability : internal
+-- Portability : non-portable (GHC extensions)
+--
+-- Windows I/O manager interfaces. Depending on which I/O Subsystem is used
+-- requests will be routed to different places.
+--
+-----------------------------------------------------------------------------
+
+module GHC.Event.Windows.ConsoleEvent (
+ ConsoleEvent (..),
+ start_console_handler,
+ toWin32ConsoleEvent,
+ win32ConsoleHandler
+) where
+
+import GHC.Base
+import GHC.Conc.Sync
+import GHC.Enum (Enum)
+import GHC.IO (unsafePerformIO)
+import GHC.MVar
+import GHC.Num (Num(..))
+import GHC.Read (Read)
+import GHC.Word (Word32)
+import GHC.Show (Show)
+
+#include <windows.h>
+
+data ConsoleEvent
+ = ControlC
+ | Break
+ | Close
+ -- these are sent to Services only.
+ | Logoff
+ | Shutdown
+ deriving ( Eq -- ^ @since 4.3.0.0
+ , Ord -- ^ @since 4.3.0.0
+ , Enum -- ^ @since 4.3.0.0
+ , Show -- ^ @since 4.3.0.0
+ , Read -- ^ @since 4.3.0.0
+ )
+
+start_console_handler :: Word32 -> IO ()
+start_console_handler r =
+ case toWin32ConsoleEvent r of
+ Just x -> withMVar win32ConsoleHandler $ \handler -> do
+ _ <- forkIO (handler x)
+ return ()
+ Nothing -> return ()
+
+toWin32ConsoleEvent :: (Eq a, Num a) => a -> Maybe ConsoleEvent
+toWin32ConsoleEvent ev =
+ case ev of
+ #{const CTRL_C_EVENT } -> Just ControlC
+ #{const CTRL_BREAK_EVENT } -> Just Break
+ #{const CTRL_CLOSE_EVENT } -> Just Close
+ #{const CTRL_LOGOFF_EVENT } -> Just Logoff
+ #{const CTRL_SHUTDOWN_EVENT } -> Just Shutdown
+ _ -> Nothing
+
+win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
+win32ConsoleHandler =
+ unsafePerformIO (newMVar (errorWithoutStackTrace "win32ConsoleHandler"))
diff --git a/libraries/base/GHC/Event/Windows/FFI.hsc b/libraries/base/GHC/Event/Windows/FFI.hsc
new file mode 100644
index 0000000000..02bac48a62
--- /dev/null
+++ b/libraries/base/GHC/Event/Windows/FFI.hsc
@@ -0,0 +1,395 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE DoAndIfThenElse #-}
+{-# LANGUAGE ForeignFunctionInterface #-}
+{-# LANGUAGE EmptyDataDecls #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-------------------------------------------------------------------------------
+-- |
+-- Module : GHC.Event.Windows.FFI
+-- Copyright : (c) Tamar Christina 2019
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable
+--
+-- WinIO Windows API Foreign Function imports
+--
+-------------------------------------------------------------------------------
+
+module GHC.Event.Windows.FFI (
+ -- * IOCP
+ IOCP(..),
+ CompletionKey,
+ newIOCP,
+ associateHandleWithIOCP,
+ getQueuedCompletionStatusEx,
+ postQueuedCompletionStatus,
+ getOverlappedResult,
+
+ -- * Overlapped
+ OVERLAPPED,
+ LPOVERLAPPED,
+ OVERLAPPED_ENTRY(..),
+ LPOVERLAPPED_ENTRY,
+ HASKELL_OVERLAPPED,
+ LPHASKELL_OVERLAPPED,
+ allocOverlapped,
+ zeroOverlapped,
+ pokeOffsetOverlapped,
+ overlappedIOStatus,
+ overlappedIONumBytes,
+
+ -- * Cancel pending I/O
+ cancelIoEx,
+ cancelIoEx',
+
+ -- * Monotonic time
+
+ -- ** GetTickCount
+ getTickCount64,
+
+ -- ** QueryPerformanceCounter
+ queryPerformanceCounter,
+ queryPerformanceFrequency,
+
+ -- ** Miscellaneous
+ throwWinErr,
+ setLastError
+) where
+
+#include <ntstatus.h>
+#include <windows.h>
+#include "winio_structs.h"
+
+##include "windows_cconv.h"
+
+import Data.Maybe
+import Foreign
+import GHC.Base
+import GHC.Num ((*))
+import GHC.Real (fromIntegral)
+import GHC.Show
+import GHC.Windows
+import qualified GHC.Event.Array as A
+import qualified GHC.Windows as Win32
+import GHC.IO.Handle.Internals (debugIO)
+
+------------------------------------------------------------------------
+-- IOCP
+
+-- | An I/O completion port.
+newtype IOCP = IOCP HANDLE
+ deriving (Eq, Ord, Show)
+
+type CompletionKey = ULONG_PTR
+
+-- | This function has two distinct purposes depending on the value of
+-- The completion port handle:
+--
+-- - When the IOCP port is NULL then the function creates a new I/O completion
+-- port. See `newIOCP`.
+--
+-- - When The port contains a valid handle then the given handle is
+-- associated with he given completion port handle. Once associated it
+-- cannot be easily changed. Associating a Handle with a Completion Port
+-- allows the I/O manager's worker threads to handle requests to the given
+-- handle.
+foreign import WINDOWS_CCONV unsafe "windows.h CreateIoCompletionPort"
+ c_CreateIoCompletionPort :: HANDLE -> IOCP -> ULONG_PTR -> DWORD
+ -> IO IOCP
+
+-- | Create a new I/O completion port.
+newIOCP :: IO IOCP
+newIOCP = failIf (== IOCP nullPtr) "newIOCP" $
+ c_CreateIoCompletionPort iNVALID_HANDLE_VALUE (IOCP nullPtr) 0 0
+
+-- | Associate a HANDLE with an I/O completion port.
+associateHandleWithIOCP :: IOCP -> HANDLE -> CompletionKey -> IO ()
+associateHandleWithIOCP iocp handle completionKey =
+ failIf_ (/= iocp) "associateHandleWithIOCP" $
+ c_CreateIoCompletionPort handle iocp completionKey 0
+
+foreign import WINDOWS_CCONV safe "windows.h GetOverlappedResult"
+ c_GetOverlappedResult :: HANDLE -> LPOVERLAPPED -> Ptr DWORD -> BOOL
+ -> IO BOOL
+
+-- | Get the result of a single overlap operation without the IO manager
+getOverlappedResult :: HANDLE -> Ptr OVERLAPPED -> BOOL -> IO (Maybe DWORD)
+getOverlappedResult handle lp block
+ = alloca $ \bytes ->
+ do res <- c_GetOverlappedResult handle lp bytes block
+ if res
+ then fmap Just $ peek bytes
+ else return Nothing
+
+foreign import WINDOWS_CCONV safe "windows.h GetQueuedCompletionStatusEx"
+ c_GetQueuedCompletionStatusEx :: IOCP -> LPOVERLAPPED_ENTRY -> Word32
+ -> Ptr ULONG -> DWORD -> BOOL -> IO BOOL
+
+-- | Note [Completion Ports]
+-- When an I/O operation has been queued by an operation
+-- (ReadFile/WriteFile/etc) it is placed in a queue that the driver uses when
+-- servicing IRQs. This queue has some important properties:
+--
+-- 1.) It is not an ordered queue. Requests may be performed out of order as
+-- as the OS's native I/O manager may try to re-order requests such that as
+-- few random seeks as possible are needed to complete the pending
+-- operations. As such do not assume a fixed order between something being
+-- queued and dequeued.
+--
+-- 2.) Operations may skip the queue entirely. In which case they do not end in
+-- in this function. (This is an optimization flag we have turned on. See
+-- `openFile`.)
+--
+-- 3.) Across this call the specified OVERLAPPED_ENTRY buffer MUST remain live,
+-- and the buffer for an I/O operation cannot be freed or moved until
+-- `getOverlappedResult` says it's done. The reason is the kernel may not
+-- have fully released the buffer, or finished writing to it when this
+-- operation returns. Failure to adhere to this will cause your IRQs to be
+-- silently dropped and your program will never receive a completion for it.
+-- This means that the OVERLAPPED buffer must also remain valid for the
+-- duration of the call and as such must be allocated on the unmanaged heap.
+--
+-- 4.) When a thread calls this method it is associated with the I/O manager's
+-- worker threads pool. You should always use dedicated threads for this
+-- since the OS I/O manager will now monitor the threads. If the thread
+-- becomes blocked for whatever reason, the I/O manager will wake up
+-- another threads from it's pool to service the remaining results.
+-- A new thread will also be woken up from the pool when the previous thread
+-- is busy servicing requests and new requests have finished. For this
+-- reason the I/O manager multiplexes I/O operations from N haskell threads
+-- into 1 completion port, which is serviced by M native threads in an
+-- asynchronous method. This allows it to scale efficiently.
+getQueuedCompletionStatusEx :: IOCP
+ -> A.Array OVERLAPPED_ENTRY
+ -> DWORD -- ^ Timeout in milliseconds (or
+ -- 'GHC.Windows.iNFINITE')
+ -> IO Int
+getQueuedCompletionStatusEx iocp arr timeout =
+ alloca $ \num_removed_ptr ->do
+ A.unsafeLoad arr $ \oes cap -> do
+ -- TODO: remove after debugging
+ fillBytes oes 0 (cap * (sizeOf (undefined :: OVERLAPPED_ENTRY)))
+ debugIO $ "-- call getQueuedCompletionStatusEx "
+ -- don't block the call if the rts is not supporting threads.
+ -- this would block the entire program.
+ let alertable = False -- not rtsSupportsBoundThreads
+ ok <- c_GetQueuedCompletionStatusEx iocp oes (fromIntegral cap)
+ num_removed_ptr timeout alertable
+ debugIO $ "-- call getQueuedCompletionStatusEx: " ++ show ok
+ err <- getLastError
+ nc <- (peek num_removed_ptr)
+ debugIO $ "-- getQueuedCompletionStatusEx: n=" ++ show nc ++ " ,err=" ++ show err
+ if ok then fromIntegral `fmap` peek num_removed_ptr
+ else do debugIO $ "failed getQueuedCompletionStatusEx: " ++ show err
+ if err == #{const WAIT_TIMEOUT} || alertable then return 0
+ else failWith "GetQueuedCompletionStatusEx" err
+
+overlappedIOStatus :: LPOVERLAPPED -> IO NTSTATUS
+overlappedIOStatus lpol = do
+ status <- #{peek OVERLAPPED, Internal} lpol
+ -- TODO: Map NTSTATUS to ErrCode?
+ -- See https://github.com/libuv/libuv/blob/b12624c13693c4d29ca84b3556eadc9e9c0936a4/src/win/winsock.c#L153
+ return status
+{-# INLINE overlappedIOStatus #-}
+
+overlappedIONumBytes :: LPOVERLAPPED -> IO ULONG_PTR
+overlappedIONumBytes lpol = do
+ bytes <- #{peek OVERLAPPED, InternalHigh} lpol
+ return bytes
+{-# INLINE overlappedIONumBytes #-}
+
+foreign import WINDOWS_CCONV unsafe "windows.h PostQueuedCompletionStatus"
+ c_PostQueuedCompletionStatus :: IOCP -> DWORD -> ULONG_PTR -> LPOVERLAPPED
+ -> IO BOOL
+
+-- | Manually post a completion to the specified I/O port. This will wake up
+-- a thread waiting `GetQueuedCompletionStatusEx`.
+postQueuedCompletionStatus :: IOCP -> DWORD -> CompletionKey -> LPOVERLAPPED
+ -> IO ()
+postQueuedCompletionStatus iocp numBytes completionKey lpol =
+ failIfFalse_ "PostQueuedCompletionStatus" $
+ c_PostQueuedCompletionStatus iocp numBytes completionKey lpol
+
+------------------------------------------------------------------------
+-- Overlapped
+
+-- | Tag type for @LPOVERLAPPED@.
+data OVERLAPPED
+
+-- | Tag type for the extended version of @OVERLAPPED@ containg some book
+-- keeping information.
+data HASKELL_OVERLAPPED
+
+-- | Identifies an I/O operation. Used as the @LPOVERLAPPED@ parameter
+-- for overlapped I/O functions (e.g. @ReadFile@, @WSASend@).
+type LPOVERLAPPED = Ptr OVERLAPPED
+
+-- | Pointer to the extended HASKELL_OVERLAPPED function.
+type LPHASKELL_OVERLAPPED = Ptr HASKELL_OVERLAPPED
+
+-- | An array of these is passed to GetQueuedCompletionStatusEx as an output
+-- argument.
+data OVERLAPPED_ENTRY = OVERLAPPED_ENTRY {
+ lpCompletionKey :: ULONG_PTR,
+ lpOverlapped :: LPOVERLAPPED,
+ dwNumberOfBytesTransferred :: DWORD
+ }
+
+type LPOVERLAPPED_ENTRY = Ptr OVERLAPPED_ENTRY
+
+instance Storable OVERLAPPED_ENTRY where
+ sizeOf _ = #{size OVERLAPPED_ENTRY}
+ alignment _ = #{alignment OVERLAPPED_ENTRY}
+
+ peek ptr = do
+ lpCompletionKey <- #{peek OVERLAPPED_ENTRY, lpCompletionKey} ptr
+ lpOverlapped <- #{peek OVERLAPPED_ENTRY, lpOverlapped} ptr
+ dwNumberOfBytesTransferred <-
+ #{peek OVERLAPPED_ENTRY, dwNumberOfBytesTransferred} ptr
+ let !oe = OVERLAPPED_ENTRY{..}
+ return oe
+
+ poke ptr OVERLAPPED_ENTRY{..} = do
+ #{poke OVERLAPPED_ENTRY, lpCompletionKey} ptr lpCompletionKey
+ #{poke OVERLAPPED_ENTRY, lpOverlapped} ptr lpOverlapped
+ #{poke OVERLAPPED_ENTRY, dwNumberOfBytesTransferred}
+ ptr dwNumberOfBytesTransferred
+
+-- | Allocate a new
+-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms684342%28v=vs.85%29.aspx
+-- OVERLAPPED> structure on the unmanaged heap. This also zeros the memory to
+-- prevent the values inside the struct to be incorrectlt interpreted as data
+-- payload.
+--
+-- We extend the overlapped structure with some extra book keeping information
+-- such that we don't have to do a lookup on the Haskell side.
+--
+-- Future: We can gain some performance here by using a pool instead of calling
+-- malloc for each request. A simple block allocator would be very
+-- useful here, especially when we implement sockets support.
+allocOverlapped :: Word64 -- ^ Offset/OffsetHigh
+ -> IO (Ptr HASKELL_OVERLAPPED)
+allocOverlapped offset = do
+ lpol <- mallocBytes #{size HASKELL_OVERLAPPED}
+ zeroOverlapped lpol
+ pokeOffsetOverlapped (castPtr lpol) offset
+ return lpol
+
+-- | Zero-fill an HASKELL_OVERLAPPED structure.
+zeroOverlapped :: LPHASKELL_OVERLAPPED -> IO ()
+zeroOverlapped lpol = fillBytes lpol 0 #{size HASKELL_OVERLAPPED}
+{-# INLINE zeroOverlapped #-}
+
+-- | Set the offset field in an OVERLAPPED structure.
+pokeOffsetOverlapped :: LPOVERLAPPED -> Word64 -> IO ()
+pokeOffsetOverlapped lpol offset = do
+ let (offsetHigh, offsetLow) = Win32.ddwordToDwords offset
+ #{poke OVERLAPPED, Offset} lpol offsetLow
+ #{poke OVERLAPPED, OffsetHigh} lpol offsetHigh
+{-# INLINE pokeOffsetOverlapped #-}
+
+------------------------------------------------------------------------
+-- Cancel pending I/O
+
+-- | CancelIo shouldn't block, but cancellation happens infrequently,
+-- so we might as well be on the safe side.
+foreign import WINDOWS_CCONV unsafe "windows.h CancelIoEx"
+ c_CancelIoEx :: HANDLE -> LPOVERLAPPED -> IO BOOL
+
+-- | Cancel all pending overlapped I/O for the given file that was initiated by
+-- the current OS thread. Cancelling is just a request for cancellation and
+-- before the OVERLAPPED struct is freed we must make sure that the IRQ has been
+-- removed from the queue. See `getOverlappedResult`.
+cancelIoEx :: HANDLE -> LPOVERLAPPED -> IO ()
+cancelIoEx h o = failIfFalse_ "CancelIoEx" . c_CancelIoEx h $ o
+
+cancelIoEx' :: HANDLE -> LPOVERLAPPED -> IO Bool
+cancelIoEx' = c_CancelIoEx
+
+------------------------------------------------------------------------
+-- Monotonic time
+
+foreign import WINDOWS_CCONV "windows.h GetTickCount64"
+ c_GetTickCount64 :: IO #{type ULONGLONG}
+
+-- | Call the @GetTickCount64@ function, which returns a monotonic time in
+-- milliseconds.
+--
+-- Problems:
+--
+-- * Low resolution (10 to 16 milliseconds).
+--
+-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms724408%28v=vs.85%29.aspx>
+getTickCount64 :: IO Word64
+getTickCount64 = c_GetTickCount64
+
+-- | Call the @QueryPerformanceCounter@ function.
+--
+-- Problems:
+--
+-- * Might not be available on some hardware. Use 'queryPerformanceFrequency'
+-- to test for availability before calling this function.
+--
+-- * On a multiprocessor computer, may produce different results on
+-- different processors due to hardware bugs.
+--
+-- To get a monotonic time in seconds, divide the result of
+-- 'queryPerformanceCounter' by that of 'queryPerformanceFrequency'.
+--
+-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms644904%28v=vs.85%29.aspx>
+queryPerformanceCounter :: IO Int64
+queryPerformanceCounter =
+ callQP c_QueryPerformanceCounter
+ >>= maybe (throwGetLastError "QueryPerformanceCounter") return
+
+-- | Call the @QueryPerformanceFrequency@ function. Return 'Nothing' if the
+-- hardware does not provide a high-resolution performance counter.
+--
+-- <http://msdn.microsoft.com/en-us/library/windows/desktop/ms644905%28v=vs.85%29.aspx>
+queryPerformanceFrequency :: IO (Maybe Int64)
+queryPerformanceFrequency = do
+ m <- callQP c_QueryPerformanceFrequency
+ case m of
+ Nothing -> return Nothing
+ Just 0 -> return Nothing -- Shouldn't happen; just a safeguard to
+ -- avoid a zero denominator.
+ Just freq -> return (Just freq)
+
+type QPFunc = Ptr Int64 -> IO BOOL
+
+foreign import WINDOWS_CCONV "Windows.h QueryPerformanceCounter"
+ c_QueryPerformanceCounter :: QPFunc
+
+foreign import WINDOWS_CCONV "Windows.h QueryPerformanceFrequency"
+ c_QueryPerformanceFrequency :: QPFunc
+
+callQP :: QPFunc -> IO (Maybe Int64)
+callQP qpfunc =
+ allocaBytes #{size LARGE_INTEGER} $ \ptr -> do
+ ok <- qpfunc ptr
+ if ok then do
+ n <- #{peek LARGE_INTEGER, QuadPart} ptr
+ return (Just n)
+ else
+ return Nothing
+
+------------------------------------------------------------------------
+-- Miscellaneous
+
+type ULONG_PTR = #type ULONG_PTR
+
+throwWinErr :: String -> ErrCode -> IO a
+throwWinErr loc err = do
+ c_SetLastError err
+ Win32.failWith loc err
+
+setLastError :: ErrCode -> IO ()
+setLastError = c_SetLastError
+
+foreign import WINDOWS_CCONV unsafe "windows.h SetLastError"
+ c_SetLastError :: ErrCode -> IO ()
diff --git a/libraries/base/GHC/Event/Windows/ManagedThreadPool.hs b/libraries/base/GHC/Event/Windows/ManagedThreadPool.hs
new file mode 100644
index 0000000000..11c1259257
--- /dev/null
+++ b/libraries/base/GHC/Event/Windows/ManagedThreadPool.hs
@@ -0,0 +1,98 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE DoAndIfThenElse #-}
+{-# LANGUAGE ForeignFunctionInterface #-}
+{-# LANGUAGE EmptyDataDecls #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-------------------------------------------------------------------------------
+-- |
+-- Module : GHC.Event.Windows.ManagedThreadPool
+-- Copyright : (c) Tamar Christina 2019
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable
+--
+-- WinIO Windows Managed Thread pool API. This thread pool scales dynamically
+-- based on demand.
+--
+-------------------------------------------------------------------------------
+
+module GHC.Event.Windows.ManagedThreadPool
+ ( ThreadPool(..)
+ , startThreadPool
+ , notifyRunning
+ , notifyWaiting
+ ) where
+
+import Control.Concurrent.MVar
+import Data.Maybe
+import Foreign
+import GHC.Base
+import GHC.Num ((-), (+))
+import GHC.Real (fromIntegral)
+import GHC.Show
+import GHC.Windows
+import qualified GHC.Event.Array as A
+import qualified GHC.Windows as Win32
+import GHC.IO.Handle.Internals (debugIO)
+import GHC.Conc.Sync (forkIO, showThreadId,
+ ThreadId(..), ThreadStatus(..),
+ threadStatus, sharedCAF)
+import System.IO.Unsafe (unsafePerformIO)
+import GHC.RTS.Flags
+
+------------------------------------------------------------------------
+-- Thread spool manager
+
+type WorkerJob = IO ()
+
+-- | Thread pool manager state
+data ThreadPool = ThreadPool
+ { thrMainThread :: Maybe ThreadId
+ , thrMaxThreads :: {-# UNPACK #-} !Int
+ , thrMinThreads :: {-# UNPACK #-} !Int
+ , thrCurThreads :: {-# UNPACK #-} !Int
+ , thrCallBack :: WorkerJob
+ , thrActiveThreads :: MVar Int
+ , thrMonitor :: MVar ()
+ , thrThreadIds :: {-#UNPACK #-} !(A.Array ThreadId)
+ }
+
+startThreadPool :: WorkerJob -> IO ThreadPool
+startThreadPool job = do
+ debugIO "Starting I/O manager threadpool..."
+ let thrMinThreads = 2
+ let thrCurThreads = 0
+ let thrCallBack = job
+ thrMaxThreads <- (fromIntegral . numIoWorkerThreads) `fmap` getMiscFlags
+ thrActiveThreads <- newMVar 0
+ thrMonitor <- newEmptyMVar
+ thrThreadIds <- undefined -- A.new thrMaxThreads
+ let thrMainThread = Nothing
+
+ let !pool = ThreadPool{..}
+ return pool
+
+monitorThreadPool :: MVar () -> IO ()
+monitorThreadPool monitor = do
+ active <- takeMVar monitor
+
+ return ()
+
+notifyRunning :: Maybe ThreadPool -> IO ()
+notifyRunning Nothing = return ()
+notifyRunning (Just pool) = do
+ modifyMVar_ (thrActiveThreads pool) (\x -> return $ x + 1)
+ _ <- tryPutMVar (thrMonitor pool) ()
+ return ()
+
+notifyWaiting :: Maybe ThreadPool -> IO ()
+notifyWaiting Nothing = return ()
+notifyWaiting (Just pool) = do
+ modifyMVar_ (thrActiveThreads pool) (\x -> return $ x - 1)
+ _ <- tryPutMVar (thrMonitor pool) ()
+ return ()
diff --git a/libraries/base/GHC/Event/Windows/Thread.hs b/libraries/base/GHC/Event/Windows/Thread.hs
new file mode 100644
index 0000000000..21d39e14b5
--- /dev/null
+++ b/libraries/base/GHC/Event/Windows/Thread.hs
@@ -0,0 +1,43 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+module GHC.Event.Windows.Thread (
+ ensureIOManagerIsRunning,
+ interruptIOManager,
+ threadDelay,
+ registerDelay,
+) where
+
+import GHC.Conc.Sync
+import GHC.Base
+import GHC.IO
+import GHC.IOPort
+import GHC.Real
+
+import GHC.Event.Windows.Clock
+import GHC.Event.Windows
+
+ensureIOManagerIsRunning :: IO ()
+ensureIOManagerIsRunning = wakeupIOManager
+
+interruptIOManager :: IO ()
+interruptIOManager = interruptSystemManager
+
+threadDelay :: Int -> IO ()
+threadDelay usecs = mask_ $ do
+ m <- newEmptyIOPort
+ mgr <- getSystemManager
+ reg <- registerTimeout mgr secs $ writeIOPort m () >> return ()
+ readIOPort m `onException` unregisterTimeout mgr reg
+ where
+ secs = microsecondsToSeconds usecs
+
+registerDelay :: Int -> IO (TVar Bool)
+registerDelay usecs = do
+ t <- newTVarIO False
+ mgr <- getSystemManager
+ _ <- registerTimeout mgr secs $ atomically $ writeTVar t True
+ return t
+ where
+ secs = microsecondsToSeconds usecs
+
+microsecondsToSeconds :: Int -> Seconds
+microsecondsToSeconds us = fromIntegral us / 1000000.0
diff --git a/libraries/base/GHC/IO/Types.hs b/libraries/base/GHC/IO/Types.hs
new file mode 100644
index 0000000000..3124c48052
--- /dev/null
+++ b/libraries/base/GHC/IO/Types.hs
@@ -0,0 +1,41 @@
+{-# LANGUAGE NoImplicitPrelude, TypeSynonymInstances, FlexibleInstances #-}
+-------------------------------------------------------------------------------
+-- |
+-- Module : GHC.IO.Types
+-- Copyright : (c) Tamar Christina 2018
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries@haskell.org
+-- Stability : experimental
+-- Portability : non-portable
+--
+-- Abstraction over C Handle types for GHC, Unix wants FD (CInt) while Windows
+-- Wants Handle (CIntPtr), so we abstract over them here.
+--
+-------------------------------------------------------------------------------
+
+module GHC.IO.Types
+ ( module GHC.IO.Types
+ , IntPtr
+ , POSIX.Fd) where
+
+import GHC.Base
+import GHC.Num
+import GHC.Real
+
+import Foreign.Ptr (IntPtr, intPtrToPtr)
+import qualified System.Posix.Types as POSIX
+import qualified GHC.Windows as WIN32
+
+-- To keep backwards compatibility with existing code we must use a type
+-- class here due to the different widths of the native handle types of the
+-- platforms.
+class (Num a, Integral a) => BHandle a where
+ toFd :: a -> POSIX.Fd
+ toFd = fromIntegral
+
+ toHandle :: a -> WIN32.HANDLE
+ toHandle = intPtrToPtr . fromIntegral
+
+instance BHandle POSIX.Fd where
+instance BHandle IntPtr where
diff --git a/libraries/base/cbits/consUtils.c b/libraries/base/cbits/consUtils.c
index 5ca0c1b608..ac5d3ea75a 100644
--- a/libraries/base/cbits/consUtils.c
+++ b/libraries/base/cbits/consUtils.c
@@ -1,4 +1,4 @@
-/*
+/*
* (c) The University of Glasgow 2002
*
* Win32 Console API support
@@ -46,7 +46,7 @@ set_console_buffering__(int fd, int cooked)
leave ECHO_INPUT enabled without also having LINE_INPUT,
so we have to turn both off here. */
DWORD flgs = ENABLE_LINE_INPUT | ENABLE_ECHO_INPUT;
-
+
if ( (h = (HANDLE)_get_osfhandle(fd)) != INVALID_HANDLE_VALUE ) {
if ( GetConsoleMode(h,&st) &&
SetConsoleMode(h, cooked ? (st | ENABLE_LINE_INPUT) : st & ~flgs) ) {
diff --git a/libraries/base/include/winio_structs.h b/libraries/base/include/winio_structs.h
new file mode 100644
index 0000000000..da9dab05b7
--- /dev/null
+++ b/libraries/base/include/winio_structs.h
@@ -0,0 +1,40 @@
+/*
+ * (c) Tamar Christina, 2019.
+ *
+ * Structures supporting the IOCP based I/O Manager or Windows.
+ */
+
+#include <Windows.h>
+#include <stdint.h>
+
+#if defined(_WIN64)
+# define ALIGNMENT __attribute__ ((aligned (8)))
+#elif defined(_WIN32)
+# define ALIGNMENT __attribute__ ((aligned (8)))
+#else
+# error "unknown environment, can't determine alignment"
+#endif
+
+/* Completion data structure. Must be kept in sync with that in
+ GHC.Event.Windows or horrible things happen. */
+typedef struct _CompletionData {
+ /* The Handle to the object for which the I/O operation is in progress. */
+ HWND cdHandle;
+ /* Handle to the callback routine to call to notify that an operation has
+ finished. This value is opaque as it shouldn't be accessible
+ outside the Haskell world. */
+ uintptr_t cdCallback;
+} CompletionData, *LPCompletionData;
+
+/* The Windows API Requires an OVERLAPPED struct for asynchronous access,
+ however if we pad the structure we can give extra book keeping information
+ without needing to look these up later. Do not modify this struct unless
+ you know what you're doing. */
+typedef struct _HASKELL_OVERLAPPED {
+ /* Windows OVERLAPPED structure. NOTE: MUST BE FIRST element. */
+ OVERLAPPED hoOverlapped;
+ /* Pointer to additional payload in Haskell land. This will contain a
+ foreign pointer. We only use atomic operations to access this field in
+ order to correctly handle multiple threads using it. */
+ LPCompletionData hoData ALIGNMENT;
+} HASKELL_OVERLAPPED;