summaryrefslogtreecommitdiff
path: root/libraries/base/GHC/Conc/POSIX.hs
blob: 84dc68fc30dc9f743468bd58f968cbe7b77f5fd7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE CPP, NoImplicitPrelude, MagicHash, UnboxedTuples #-}
{-# OPTIONS_GHC -Wno-missing-signatures #-}
{-# OPTIONS_HADDOCK not-home #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  GHC.Conc.POSIX
-- Copyright   :  (c) The University of Glasgow, 1994-2002
-- License     :  see libraries/base/LICENSE
--
-- Maintainer  :  cvs-ghc@haskell.org
-- Stability   :  internal
-- Portability :  non-portable (GHC extensions)
--
-- Windows I/O manager
--
-- This is the I/O manager based on posix FDs for windows.
-- When using the winio manager these functions may not
-- be used as they will behave in unexpected ways.
--
-- TODO: This manager is currently the default. But we will eventually
-- switch to use winio instead.
--
-----------------------------------------------------------------------------

-- #not-home
module GHC.Conc.POSIX
       ( ensureIOManagerIsRunning
       , interruptIOManager

       -- * Waiting
       , threadDelay
       , registerDelay

       -- * Miscellaneous
       , asyncRead
       , asyncWrite
       , asyncDoProc

       , asyncReadBA
       , asyncWriteBA

       , module GHC.Event.Windows.ConsoleEvent
       ) where


#include "windows_cconv.h"

import Data.Bits (shiftR)
import GHC.Base
import GHC.Conc.Sync
import GHC.Conc.POSIX.Const
import GHC.Event.Windows.ConsoleEvent
import GHC.IO (unsafePerformIO)
import GHC.IORef
import GHC.MVar
import GHC.Num (Num(..))
import GHC.Ptr
import GHC.Real (div, fromIntegral)
import GHC.Word (Word32, Word64)
import GHC.Windows
import Unsafe.Coerce ( unsafeCoerceUnlifted )

-- ----------------------------------------------------------------------------
-- Thread waiting

-- Note: threadWaitRead and threadWaitWrite aren't really functional
-- on Win32, but left in there because lib code (still) uses them (the manner
-- in which they're used doesn't cause problems on a Win32 platform though.)

asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
asyncRead  (I# fd) (I# isSock) (I# len) (Ptr buf) =
  IO $ \s -> case asyncRead# fd isSock len buf s of
               (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)

asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
asyncWrite  (I# fd) (I# isSock) (I# len) (Ptr buf) =
  IO $ \s -> case asyncWrite# fd isSock len buf s of
               (# s', len#, err# #) -> (# s', (I# len#, I# err#) #)

asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
asyncDoProc (FunPtr proc) (Ptr param) =
    -- the 'length' value is ignored; simplifies implementation of
    -- the async*# primops to have them all return the same result.
  IO $ \s -> case asyncDoProc# proc param s  of
               (# s', _len#, err# #) -> (# s', I# err# #)

-- to aid the use of these primops by the IO Handle implementation,
-- provide the following convenience funs:

-- this better be a pinned byte array!
asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
asyncReadBA fd isSock len off bufB =
  asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerceUnlifted bufB))) `plusPtr` off)

asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
asyncWriteBA fd isSock len off bufB =
  asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerceUnlifted bufB))) `plusPtr` off)

-- ----------------------------------------------------------------------------
-- Threaded RTS implementation of threadDelay

-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
-- There is no guarantee that the thread will be rescheduled promptly
-- when the delay has expired, but the thread will never continue to
-- run /earlier/ than specified.
--
threadDelay :: Int -> IO ()
threadDelay time
  | threaded  = waitForDelayEvent time
  | otherwise = IO $ \s ->
        case time of { I# time# ->
        case delay# time# s of { s' -> (# s', () #)
        }}

-- | Set the value of returned TVar to True after a given number of
-- microseconds. The caveats associated with threadDelay also apply.
--
registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs
  | threaded = waitForDelayEventSTM usecs
  | otherwise = errorWithoutStackTrace "registerDelay: requires -threaded"

foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool

waitForDelayEvent :: Int -> IO ()
waitForDelayEvent usecs = do
  m <- newEmptyMVar
  target <- calculateTarget usecs
  _ <- atomicModifyIORef'_ pendingDelays (\xs -> Delay target m : xs)
  prodServiceThread
  takeMVar m

-- Delays for use in STM
waitForDelayEventSTM :: Int -> IO (TVar Bool)
waitForDelayEventSTM usecs = do
   t <- atomically $ newTVar False
   target <- calculateTarget usecs
   _ <- atomicModifyIORef'_ pendingDelays (\xs -> DelaySTM target t : xs)
   prodServiceThread
   return t

calculateTarget :: Int -> IO USecs
calculateTarget usecs = do
    now <- getMonotonicUSec
    return $ now + (fromIntegral usecs)

data DelayReq
  = Delay    {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
  | DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)

{-# NOINLINE pendingDelays #-}
pendingDelays :: IORef [DelayReq]
pendingDelays = unsafePerformIO $ do
   m <- newIORef []
   sharedCAF m getOrSetGHCConcWindowsPendingDelaysStore

foreign import ccall unsafe "getOrSetGHCConcWindowsPendingDelaysStore"
    getOrSetGHCConcWindowsPendingDelaysStore :: Ptr a -> IO (Ptr a)

{-# NOINLINE ioManagerThread #-}
ioManagerThread :: MVar (Maybe ThreadId)
ioManagerThread = unsafePerformIO $ do
   m <- newMVar Nothing
   sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore

foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore"
    getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a)

ensureIOManagerIsRunning :: IO ()
ensureIOManagerIsRunning
  | threaded  = startIOManagerThread
  | otherwise = return ()

interruptIOManager :: IO ()
interruptIOManager = return ()

startIOManagerThread :: IO ()
startIOManagerThread = do
  modifyMVar_ ioManagerThread $ \old -> do
    let create = do t <- forkIO ioManager;
                    labelThread t "IOManagerThread";
                    return (Just t)
    case old of
      Nothing -> create
      Just t  -> do
        s <- threadStatus t
        case s of
          ThreadFinished -> create
          ThreadDied     -> create
          _other         -> return (Just t)

insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
insertDelay d [] = [d]
insertDelay d1 ds@(d2 : rest)
  | delayTime d1 <= delayTime d2 = d1 : ds
  | otherwise                    = d2 : insertDelay d1 rest

delayTime :: DelayReq -> USecs
delayTime (Delay t _) = t
delayTime (DelaySTM t _) = t

type USecs = Word64
type NSecs = Word64

foreign import ccall unsafe "getMonotonicNSec"
  getMonotonicNSec :: IO NSecs

getMonotonicUSec :: IO USecs
getMonotonicUSec = fmap (`div` 1000) getMonotonicNSec

{-# NOINLINE prodding #-}
prodding :: IORef Bool
prodding = unsafePerformIO $ do
   r <- newIORef False
   sharedCAF r getOrSetGHCConcWindowsProddingStore

foreign import ccall unsafe "getOrSetGHCConcWindowsProddingStore"
    getOrSetGHCConcWindowsProddingStore :: Ptr a -> IO (Ptr a)

prodServiceThread :: IO ()
prodServiceThread = do
  -- NB. use atomicSwapIORef here, otherwise there are race
  -- conditions in which prodding is left at True but the server is
  -- blocked in select().
  was_set <- atomicSwapIORef prodding True
  when (not was_set) wakeupIOManager

-- ----------------------------------------------------------------------------
-- Windows IO manager thread

ioManager :: IO ()
ioManager = do
  wakeup <- c_getIOManagerEvent
  service_loop wakeup []

service_loop :: HANDLE          -- read end of pipe
             -> [DelayReq]      -- current delay requests
             -> IO ()

service_loop wakeup old_delays = do
  -- pick up new delay requests
  new_delays <- atomicSwapIORef pendingDelays []
  let  delays = foldr insertDelay old_delays new_delays

  now <- getMonotonicUSec
  (delays', timeout) <- getDelay now delays

  r <- c_WaitForSingleObject wakeup timeout
  case r of
    0xffffffff -> do throwGetLastError "service_loop"
    0 -> do
        r2 <- c_readIOManagerEvent
        exit <-
              case r2 of
                _ | r2 == io_MANAGER_WAKEUP -> return False
                _ | r2 == io_MANAGER_DIE    -> return True
                0 -> return False -- spurious wakeup
                _ -> do start_console_handler (r2 `shiftR` 1); return False
        when (not exit) $ service_cont wakeup delays'

    _other -> service_cont wakeup delays' -- probably timeout

service_cont :: HANDLE -> [DelayReq] -> IO ()
service_cont wakeup delays = do
  _ <- atomicSwapIORef prodding False
  service_loop wakeup delays

wakeupIOManager :: IO ()
wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP

-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for.  The queue of pending
-- delays is kept ordered.
getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
getDelay _   [] = return ([], iNFINITE)
getDelay now all@(d : rest)
  = case d of
     Delay time m | now >= time -> do
        putMVar m ()
        getDelay now rest
     DelaySTM time t | now >= time -> do
        atomically $ writeTVar t True
        getDelay now rest
     _otherwise ->
        -- delay is in millisecs for WaitForSingleObject
        let micro_seconds = delayTime d - now
            milli_seconds = (micro_seconds + 999) `div` 1000
        in return (all, fromIntegral milli_seconds)

foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
  c_getIOManagerEvent :: IO HANDLE

foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
  c_readIOManagerEvent :: IO Word32

foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
  c_sendIOManagerEvent :: Word32 -> IO ()

foreign import WINDOWS_CCONV "WaitForSingleObject"
   c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD