summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--compiler/GHC/Builtin/Names.hs13
-rw-r--r--compiler/GHC/Builtin/Types/Prim.hs20
-rw-r--r--compiler/GHC/Builtin/primops.txt.pp39
-rw-r--r--compiler/GHC/StgToCmm/Prim.hs4
-rw-r--r--includes/rts/Constants.h6
-rw-r--r--includes/rts/storage/TSO.h1
-rw-r--r--includes/stg/MiscClosures.h4
-rw-r--r--libraries/base/GHC/Conc/Sync.hs3
-rw-r--r--libraries/base/GHC/IOPort.hs100
-rw-r--r--libraries/base/base.cabal4
-rw-r--r--libraries/ghc-heap/GHC/Exts/Heap/Closures.hs11
-rw-r--r--rts/PrimOps.cmm173
-rw-r--r--rts/RaiseAsync.c16
-rw-r--r--rts/RtsSymbols.c3
-rw-r--r--rts/Schedule.c37
-rw-r--r--rts/Schedule.h2
-rw-r--r--rts/Threads.c5
-rw-r--r--rts/Trace.c1
-rw-r--r--rts/TraverseHeap.c1
-rw-r--r--rts/sm/Compact.c1
-rw-r--r--rts/sm/Sanity.c1
-rw-r--r--rts/sm/Scav.c1
-rw-r--r--utils/genprimopcode/Main.hs2
23 files changed, 429 insertions, 19 deletions
diff --git a/compiler/GHC/Builtin/Names.hs b/compiler/GHC/Builtin/Names.hs
index 02a10d4b35..b9ef184923 100644
--- a/compiler/GHC/Builtin/Names.hs
+++ b/compiler/GHC/Builtin/Names.hs
@@ -1747,7 +1747,7 @@ addrPrimTyConKey, arrayPrimTyConKey, arrayArrayPrimTyConKey, boolTyConKey,
weakPrimTyConKey, mutableArrayPrimTyConKey, mutableArrayArrayPrimTyConKey,
mutableByteArrayPrimTyConKey, orderingTyConKey, mVarPrimTyConKey,
ratioTyConKey, rationalTyConKey, realWorldTyConKey, stablePtrPrimTyConKey,
- stablePtrTyConKey, eqTyConKey, heqTyConKey,
+ stablePtrTyConKey, eqTyConKey, heqTyConKey, ioPortPrimTyConKey,
smallArrayPrimTyConKey, smallMutableArrayPrimTyConKey,
stringTyConKey :: Unique
addrPrimTyConKey = mkPreludeTyConUnique 1
@@ -1783,11 +1783,12 @@ mutableArrayPrimTyConKey = mkPreludeTyConUnique 30
mutableByteArrayPrimTyConKey = mkPreludeTyConUnique 31
orderingTyConKey = mkPreludeTyConUnique 32
mVarPrimTyConKey = mkPreludeTyConUnique 33
-ratioTyConKey = mkPreludeTyConUnique 34
-rationalTyConKey = mkPreludeTyConUnique 35
-realWorldTyConKey = mkPreludeTyConUnique 36
-stablePtrPrimTyConKey = mkPreludeTyConUnique 37
-stablePtrTyConKey = mkPreludeTyConUnique 38
+ioPortPrimTyConKey = mkPreludeTyConUnique 34
+ratioTyConKey = mkPreludeTyConUnique 35
+rationalTyConKey = mkPreludeTyConUnique 36
+realWorldTyConKey = mkPreludeTyConUnique 37
+stablePtrPrimTyConKey = mkPreludeTyConUnique 38
+stablePtrTyConKey = mkPreludeTyConUnique 39
eqTyConKey = mkPreludeTyConUnique 40
heqTyConKey = mkPreludeTyConUnique 41
arrayArrayPrimTyConKey = mkPreludeTyConUnique 42
diff --git a/compiler/GHC/Builtin/Types/Prim.hs b/compiler/GHC/Builtin/Types/Prim.hs
index 88ef943a64..13f08739d0 100644
--- a/compiler/GHC/Builtin/Types/Prim.hs
+++ b/compiler/GHC/Builtin/Types/Prim.hs
@@ -62,6 +62,7 @@ module GHC.Builtin.Types.Prim(
mutVarPrimTyCon, mkMutVarPrimTy,
mVarPrimTyCon, mkMVarPrimTy,
+ ioPortPrimTyCon, mkIOPortPrimTy,
tVarPrimTyCon, mkTVarPrimTy,
stablePtrPrimTyCon, mkStablePtrPrimTy,
stableNamePrimTyCon, mkStableNamePrimTy,
@@ -171,6 +172,7 @@ exposedPrimTyCons
, mutableArrayArrayPrimTyCon
, smallMutableArrayPrimTyCon
, mVarPrimTyCon
+ , ioPortPrimTyCon
, tVarPrimTyCon
, mutVarPrimTyCon
, realWorldTyCon
@@ -207,7 +209,7 @@ mkBuiltInPrimTc fs unique tycon
BuiltInSyntax
-charPrimTyConName, intPrimTyConName, int8PrimTyConName, int16PrimTyConName, int32PrimTyConName, int64PrimTyConName, wordPrimTyConName, word32PrimTyConName, word8PrimTyConName, word16PrimTyConName, word64PrimTyConName, addrPrimTyConName, floatPrimTyConName, doublePrimTyConName, statePrimTyConName, proxyPrimTyConName, realWorldTyConName, arrayPrimTyConName, arrayArrayPrimTyConName, smallArrayPrimTyConName, byteArrayPrimTyConName, mutableArrayPrimTyConName, mutableByteArrayPrimTyConName, mutableArrayArrayPrimTyConName, smallMutableArrayPrimTyConName, mutVarPrimTyConName, mVarPrimTyConName, tVarPrimTyConName, stablePtrPrimTyConName, stableNamePrimTyConName, compactPrimTyConName, bcoPrimTyConName, weakPrimTyConName, threadIdPrimTyConName, eqPrimTyConName, eqReprPrimTyConName, eqPhantPrimTyConName, voidPrimTyConName :: Name
+charPrimTyConName, intPrimTyConName, int8PrimTyConName, int16PrimTyConName, int32PrimTyConName, int64PrimTyConName, wordPrimTyConName, word32PrimTyConName, word8PrimTyConName, word16PrimTyConName, word64PrimTyConName, addrPrimTyConName, floatPrimTyConName, doublePrimTyConName, statePrimTyConName, proxyPrimTyConName, realWorldTyConName, arrayPrimTyConName, arrayArrayPrimTyConName, smallArrayPrimTyConName, byteArrayPrimTyConName, mutableArrayPrimTyConName, mutableByteArrayPrimTyConName, mutableArrayArrayPrimTyConName, smallMutableArrayPrimTyConName, mutVarPrimTyConName, mVarPrimTyConName, ioPortPrimTyConName, tVarPrimTyConName, stablePtrPrimTyConName, stableNamePrimTyConName, compactPrimTyConName, bcoPrimTyConName, weakPrimTyConName, threadIdPrimTyConName, eqPrimTyConName, eqReprPrimTyConName, eqPhantPrimTyConName, voidPrimTyConName :: Name
charPrimTyConName = mkPrimTc (fsLit "Char#") charPrimTyConKey charPrimTyCon
intPrimTyConName = mkPrimTc (fsLit "Int#") intPrimTyConKey intPrimTyCon
int8PrimTyConName = mkPrimTc (fsLit "Int8#") int8PrimTyConKey int8PrimTyCon
@@ -238,6 +240,7 @@ mutableByteArrayPrimTyConName = mkPrimTc (fsLit "MutableByteArray#") mutableByte
mutableArrayArrayPrimTyConName= mkPrimTc (fsLit "MutableArrayArray#") mutableArrayArrayPrimTyConKey mutableArrayArrayPrimTyCon
smallMutableArrayPrimTyConName= mkPrimTc (fsLit "SmallMutableArray#") smallMutableArrayPrimTyConKey smallMutableArrayPrimTyCon
mutVarPrimTyConName = mkPrimTc (fsLit "MutVar#") mutVarPrimTyConKey mutVarPrimTyCon
+ioPortPrimTyConName = mkPrimTc (fsLit "IOPort#") ioPortPrimTyConKey ioPortPrimTyCon
mVarPrimTyConName = mkPrimTc (fsLit "MVar#") mVarPrimTyConKey mVarPrimTyCon
tVarPrimTyConName = mkPrimTc (fsLit "TVar#") tVarPrimTyConKey tVarPrimTyCon
stablePtrPrimTyConName = mkPrimTc (fsLit "StablePtr#") stablePtrPrimTyConKey stablePtrPrimTyCon
@@ -1006,7 +1009,22 @@ mkMutVarPrimTy s elt = TyConApp mutVarPrimTyCon [s, elt]
{-
************************************************************************
* *
+\subsection[TysPrim-io-port-var]{The synchronizing I/O Port type}
+* *
+************************************************************************
+-}
+
+ioPortPrimTyCon :: TyCon
+ioPortPrimTyCon = pcPrimTyCon ioPortPrimTyConName [Nominal, Representational] UnliftedRep
+
+mkIOPortPrimTy :: Type -> Type -> Type
+mkIOPortPrimTy s elt = TyConApp ioPortPrimTyCon [s, elt]
+
+{-
+************************************************************************
+* *
The synchronizing variable type
+\subsection[TysPrim-synch-var]{The synchronizing variable type}
* *
************************************************************************
-}
diff --git a/compiler/GHC/Builtin/primops.txt.pp b/compiler/GHC/Builtin/primops.txt.pp
index a12ac1f29c..261d02aa67 100644
--- a/compiler/GHC/Builtin/primops.txt.pp
+++ b/compiler/GHC/Builtin/primops.txt.pp
@@ -2827,6 +2827,45 @@ primop IsEmptyMVarOp "isEmptyMVar#" GenPrimOp
out_of_line = True
has_side_effects = True
+
+------------------------------------------------------------------------
+section "Synchronized I/O Ports"
+ {Operations on {\tt IOPort\#}s. }
+------------------------------------------------------------------------
+
+primtype IOPort# s a
+ { A shared I/O port is almost the same as a {\tt MVar\#}!).
+ The main difference is that IOPort has no deadlock detection or
+ deadlock breaking code that forcibly releases the lock. }
+
+primop NewIOPortrOp "newIOPort#" GenPrimOp
+ State# s -> (# State# s, IOPort# s a #)
+ {Create new {\tt IOPort\#}; initially empty.}
+ with
+ out_of_line = True
+ has_side_effects = True
+
+primop ReadIOPortOp "readIOPort#" GenPrimOp
+ IOPort# s a -> State# s -> (# State# s, a #)
+ {If {\tt IOPort\#} is empty, block until it becomes full.
+ Then remove and return its contents, and set it empty.}
+ with
+ out_of_line = True
+ has_side_effects = True
+
+primop WriteIOPortOp "writeIOPort#" GenPrimOp
+ IOPort# s a -> a -> State# s -> (# State# s, Int# #)
+ {If {\tt IOPort\#} is full, immediately return with integer 0.
+ Otherwise, store value arg as {\tt IOPort\#}'s new contents,
+ and return with integer 1. }
+ with
+ out_of_line = True
+ has_side_effects = True
+
+primop SameIOPortOp "sameIOPort#" GenPrimOp
+ IOPort# s a -> IOPort# s a -> Int#
+
+
------------------------------------------------------------------------
section "Delay/wait operations"
------------------------------------------------------------------------
diff --git a/compiler/GHC/StgToCmm/Prim.hs b/compiler/GHC/StgToCmm/Prim.hs
index ef5e376be8..afbcc34836 100644
--- a/compiler/GHC/StgToCmm/Prim.hs
+++ b/compiler/GHC/StgToCmm/Prim.hs
@@ -1320,6 +1320,7 @@ emitPrimOp dflags primop = case primop of
SameMutVarOp -> \args -> opTranslate args (mo_wordEq platform)
SameMVarOp -> \args -> opTranslate args (mo_wordEq platform)
+ SameIOPortOp -> \args -> opTranslate args (mo_wordEq platform)
SameMutableArrayOp -> \args -> opTranslate args (mo_wordEq platform)
SameMutableByteArrayOp -> \args -> opTranslate args (mo_wordEq platform)
SameMutableArrayArrayOp -> \args -> opTranslate args (mo_wordEq platform)
@@ -1467,6 +1468,9 @@ emitPrimOp dflags primop = case primop of
ReadMVarOp -> alwaysExternal
TryReadMVarOp -> alwaysExternal
IsEmptyMVarOp -> alwaysExternal
+ NewIOPortrOp -> alwaysExternal
+ ReadIOPortOp -> alwaysExternal
+ WriteIOPortOp -> alwaysExternal
DelayOp -> alwaysExternal
WaitReadOp -> alwaysExternal
WaitWriteOp -> alwaysExternal
diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
index c2cad8fc80..f1ca25a6f3 100644
--- a/includes/rts/Constants.h
+++ b/includes/rts/Constants.h
@@ -256,7 +256,11 @@
by tryWakeupThread() */
#define ThreadMigrating 13
-/* WARNING WARNING top number is BlockedOnMVarRead 14, not 13!! */
+/* Lightweight non-deadlock checked version of MVar. Used for the why_blocked
+ field of a TSO. */
+#define BlockedOnIOCompletion 15
+
+/* Next number is 16. */
/*
* These constants are returned to the scheduler by a thread that has
diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
index 3a488d97b5..33eebffc7c 100644
--- a/includes/rts/storage/TSO.h
+++ b/includes/rts/storage/TSO.h
@@ -288,6 +288,7 @@ void dirty_STACK (Capability *cap, StgStack *stack);
BlockedOnBlackHole MessageBlackHole * TSO->bq
BlockedOnMVar the MVAR the MVAR's queue
+ BlockedOnIOCompletion the PortEVent the IOCP's queue
BlockedOnSTM END_TSO_QUEUE STM wait queue(s)
BlockedOnSTM STM_AWOKEN run queue
diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
index dc2b0715ca..5ffdd5cd7b 100644
--- a/includes/stg/MiscClosures.h
+++ b/includes/stg/MiscClosures.h
@@ -337,6 +337,10 @@ RTS_FUN_DECL(stg_block_stmwait);
RTS_FUN_DECL(stg_block_throwto);
RTS_RET(stg_block_throwto);
+RTS_FUN_DECL(stg_readIOPortzh);
+RTS_FUN_DECL(stg_writeIOPortzh);
+RTS_FUN_DECL(stg_newIOPortzh);
+
/* Entry/exit points from StgStartup.cmm */
RTS_RET(stg_stop_thread);
diff --git a/libraries/base/GHC/Conc/Sync.hs b/libraries/base/GHC/Conc/Sync.hs
index d6ffbc2de9..80287c56c4 100644
--- a/libraries/base/GHC/Conc/Sync.hs
+++ b/libraries/base/GHC/Conc/Sync.hs
@@ -538,6 +538,8 @@ data BlockReason
-- ^blocked in 'retry' in an STM transaction
| BlockedOnForeignCall
-- ^currently in a foreign call
+ | BlockedOnIOCompletion
+ -- ^currently blocked on an I/O Completion port
| BlockedOnOther
-- ^blocked on some other resource. Without @-threaded@,
-- I\/O and 'Control.Concurrent.threadDelay' show up as
@@ -576,6 +578,7 @@ threadStatus (ThreadId t) = IO $ \s ->
mk_stat 11 = ThreadBlocked BlockedOnForeignCall
mk_stat 12 = ThreadBlocked BlockedOnException
mk_stat 14 = ThreadBlocked BlockedOnMVar -- possibly: BlockedOnMVarRead
+ mk_stat 15 = ThreadBlocked BlockedOnIOCompletion
-- NB. these are hardcoded in rts/PrimOps.cmm
mk_stat 16 = ThreadFinished
mk_stat 17 = ThreadDied
diff --git a/libraries/base/GHC/IOPort.hs b/libraries/base/GHC/IOPort.hs
new file mode 100644
index 0000000000..e4890d0989
--- /dev/null
+++ b/libraries/base/GHC/IOPort.hs
@@ -0,0 +1,100 @@
+{-# LANGUAGE Unsafe #-}
+{-# LANGUAGE NoImplicitPrelude, MagicHash, UnboxedTuples #-}
+{-# OPTIONS_GHC -funbox-strict-fields #-}
+{-# OPTIONS_HADDOCK hide #-}
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : GHC.IOPort
+-- Copyright : (c) Tamar Christina 2019
+-- License : see libraries/base/LICENSE
+--
+-- Maintainer : cvs-ghc@haskell.org
+-- Stability : internal
+-- Portability : non-portable (GHC Extensions)
+--
+-- The IOPort type. This is a synchronization primitive similar to IOVar but
+-- without any of the deadlock guarantees that IOVar provides. The ports are
+-- single write/multiple wait. Writing to an already full Port will not queue
+-- the value but instead will discard it.
+--
+--
+-----------------------------------------------------------------------------
+
+module GHC.IOPort (
+ -- * IOPorts
+ IOPort(..)
+ , newIOPort
+ , newEmptyIOPort
+ , readIOPort
+ , writeIOPort
+ ) where
+
+import GHC.Base
+
+data IOPort a = IOPort (IOPort# RealWorld a)
+{- ^
+An 'IOPort' is a synchronising variable, used
+for communication between concurrent threads, where it one of the threads is
+controlled by an external state. e.g. by an I/O action that is serviced by the
+runtime. It can be thought of as a box, which may be empty or full.
+
+It is mostly similar to the behavior of MVar except writeIOPort doesn't block if
+the variable is full and the GC won't forcibly release the lock if it thinks
+there's a deadlock.
+-}
+
+-- | @since 4.1.0.0
+instance Eq (IOPort a) where
+ (IOPort ioport1#) == (IOPort ioport2#) =
+ isTrue# (sameIOPort# ioport1# ioport2#)
+
+{-
+M-Vars are rendezvous points for concurrent threads. They begin
+empty, and any attempt to read an empty M-Var blocks. When an M-Var
+is written, a single blocked thread may be freed. Reading an M-Var
+toggles its state from full back to empty. Therefore, any value
+written to an M-Var may only be read once. Multiple reads and writes
+are allowed, but there must be at least one read between any two
+writes.
+-}
+
+-- |Create an 'IOPort' which is initially empty.
+newEmptyIOPort :: IO (IOPort a)
+newEmptyIOPort = IO $ \ s# ->
+ case newIOPort# s# of
+ (# s2#, svar# #) -> (# s2#, IOPort svar# #)
+
+-- |Create an 'IOPort' which contains the supplied value.
+newIOPort :: a -> IO (IOPort a)
+newIOPort value =
+ newEmptyIOPort >>= \ ioport ->
+ writeIOPort ioport value >>
+ return ioport
+
+-- |Atomically read the the contents of the 'IOPort'. If the 'IOPort' is
+-- currently empty, 'readIOPort' will wait until it is full. After a
+-- 'readIOPort', the 'IOPort' is left empty.
+-- TODO: Figure out how to make this an exception for better debugging.
+--
+-- There is one important property of 'readIOPort':
+--
+-- * Only a single threads can be blocked on an 'IOPort', The second thread
+-- attempting to block will be silently ignored.
+--
+readIOPort :: IOPort a -> IO a
+readIOPort (IOPort ioport#) = IO $ \ s# -> readIOPort# ioport# s#
+
+-- |Put a value into an 'IOPort'. If the 'IOPort' is currently full,
+-- 'writeIOPort' will return False and not block.
+--
+-- There is one important property of 'writeIOPort':
+--
+-- * Only a single thread can be blocked on an 'IOPort'.
+--
+writeIOPort :: IOPort a -> a -> IO Bool
+writeIOPort (IOPort ioport#) x = IO $ \ s# ->
+ case writeIOPort# ioport# x s# of
+ (# s, 0# #) -> (# s, False #)
+ (# s, _ #) -> (# s, True #)
+
diff --git a/libraries/base/base.cabal b/libraries/base/base.cabal
index aee0c20d29..1d4178a2bf 100644
--- a/libraries/base/base.cabal
+++ b/libraries/base/base.cabal
@@ -308,6 +308,8 @@ Library
Type.Reflection
Type.Reflection.Unsafe
Unsafe.Coerce
+ -- TODO: remove
+ GHC.IOPort
reexported-modules:
GHC.Num.Integer
@@ -328,6 +330,8 @@ Library
GHC.IO.Handle.Lock.NoOp
GHC.IO.Handle.Lock.Windows
GHC.StaticPtr.Internal
+ GHC.Event.Internal.Types
+ -- GHC.IOPort -- TODO: hide again after debug
System.Environment.ExecutablePath
System.CPUTime.Utils
diff --git a/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs b/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs
index bb9d440b37..8a959fc2a0 100644
--- a/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs
+++ b/libraries/ghc-heap/GHC/Exts/Heap/Closures.hs
@@ -229,7 +229,15 @@ data GenClosure b
}
-- | An @MVar#@, with a queue of thread state objects blocking on them
- | MVarClosure
+ | MVarClosure
+ { info :: !StgInfoTable
+ , queueHead :: !b -- ^ Pointer to head of queue
+ , queueTail :: !b -- ^ Pointer to tail of queue
+ , value :: !b -- ^ Pointer to closure
+ }
+
+ -- | An @IOPort#@, with a queue of thread state objects blocking on them
+ | IOPortClosure
{ info :: !StgInfoTable
, queueHead :: !b -- ^ Pointer to head of queue
, queueTail :: !b -- ^ Pointer to tail of queue
@@ -340,6 +348,7 @@ allClosures (MutArrClosure {..}) = mccPayload
allClosures (SmallMutArrClosure {..}) = mccPayload
allClosures (MutVarClosure {..}) = [var]
allClosures (MVarClosure {..}) = [queueHead,queueTail,value]
+allClosures (IOPortClosure {..}) = [queueHead,queueTail,value]
allClosures (FunClosure {..}) = ptrArgs
allClosures (BlockingQueueClosure {..}) = [link, blackHole, owner, queue]
allClosures (WeakClosure {..}) = [cfinalizers, key, value, finalizer, link]
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index e13e89b98c..0b1b1419a1 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -1998,6 +1998,179 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ )
}
/* -----------------------------------------------------------------------------
+ * IOPort primitives
+ *
+ * readIOPort & writeIOPort work as follows. Firstly, an important invariant:
+ *
+ * If the IOPort is full, then the request is silently dropped and the
+ * message is lost. If the IOPort is empty then the
+ * blocking queue contains only the thread blocked on IOPort. An IOPort only
+ * supports a single read and a single write to it.
+ *
+ * readIOPort:
+ * IOPort empty : then add ourselves to the blocking queue
+ * IOPort full : remove the value from the IOPort, and
+ * blocking queue empty : return
+ * blocking queue non-empty : perform the only blocked
+ * writeIOPort from the queue, and
+ * wake up the thread
+ * (IOPort is now empty)
+ *
+ * writeIOPort is just the dual of the above algorithm.
+ *
+ * How do we "perform a writeIOPort"? Well, By storing the value and prt on the
+ * stack, same way we do with MVars. Semantically the operations mutate the
+ * stack the same way so we will re-use the logic and datastructures for MVars
+ * for IOPort. See stg_block_putmvar and stg_block_takemvar in HeapStackCheck.c
+ * for the stack layout, and the PerformPut and PerformTake macros below. We
+ * also re-use the closure types MVAR_CLEAN/_DIRTY for IOPort.
+ *
+ * The remaining caveats of MVar thus also apply for an IOPort. The main
+ * crucial difference between an MVar and IOPort is that the scheduler will not
+ * be allowed to interrupt a blocked IOPort just because it thinks there's a
+ * deadlock. This is especially crucial for the non-threaded runtime.
+ *
+ * -------------------------------------------------------------------------- */
+
+stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
+{
+ W_ val, info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If the MVar is empty, put ourselves on the blocked readers
+ * list and wait until we're woken up.
+ */
+ if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) {
+
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+
+ ALLOC_PRIM_WITH_CUSTOM_FAILURE
+ (SIZEOF_StgMVarTSOQueue,
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ GC_PRIM_P(stg_readIOPortzh, ioport));
+
+ q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
+
+ // readIOPorts are pushed to the front of the queue, so
+ // they get handled immediately
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ StgMVarTSOQueue_link(q) = StgMVar_head(ioport);
+ StgMVarTSOQueue_tso(q) = CurrentTSO;
+
+ StgTSO__link(CurrentTSO) = q;
+ StgTSO_block_info(CurrentTSO) = ioport;
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16;
+ StgMVar_head(ioport) = q;
+
+ if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = q;
+ }
+
+ jump stg_block_readmvar(ioport);
+ }
+
+ val = StgMVar_value(ioport);
+
+ unlockClosure(ioport, info);
+ return (val);
+}
+
+stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */
+ P_ val, /* :: a */ )
+{
+ W_ info, tso, q;
+
+ LOCK_CLOSURE(ioport, info);
+
+ /* If there is already a value in the queue, then silently ignore the
+ second put. TODO: Correct usages of IOPort should never have a second
+ put, so perhaps raise an error instead, but I have no idea how to do this
+ safely and correctly at this point. */
+ if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) {
+ unlockClosure(ioport, info);
+ return (0);
+ }
+
+ q = StgMVar_head(ioport);
+loop:
+ if (q == stg_END_TSO_QUEUE_closure) {
+ /* No further takes, the IOPort is now full. */
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ }
+ StgMVar_value(ioport) = val;
+ unlockClosure(ioport, stg_MVAR_DIRTY_info);
+ return (1);
+ }
+ if (StgHeader_info(q) == stg_IND_info ||
+ StgHeader_info(q) == stg_MSG_NULL_info) {
+ q = StgInd_indirectee(q);
+ goto loop;
+ }
+
+ // There are readIOPort(s) waiting: wake up the first one
+
+ tso = StgMVarTSOQueue_tso(q);
+ StgMVar_head(ioport) = StgMVarTSOQueue_link(q);
+ if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ }
+
+ ASSERT(StgTSO_block_info(tso) == ioport);
+ // save why_blocked here, because waking up the thread destroys
+ // this information
+ W_ why_blocked;
+ why_blocked = TO_W_(StgTSO_why_blocked(tso));
+
+ // actually perform the takeMVar
+ W_ stack;
+ stack = StgTSO_stackobj(tso);
+ PerformTake(stack, val);
+
+ // indicate that the MVar operation has now completed.
+ StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+
+ if (TO_W_(StgStack_dirty(stack)) == 0) {
+ ccall dirty_STACK(MyCapability() "ptr", stack "ptr");
+ }
+
+ ccall tryWakeupThread(MyCapability() "ptr", tso);
+
+ // If it was a readIOPort, then we can still do work,
+ // so loop back. (XXX: This could take a while)
+ if (why_blocked == BlockedOnIOCompletion) {
+ q = StgMVarTSOQueue_link(q);
+ goto loop;
+ }
+
+ ASSERT(why_blocked == BlockedOnIOCompletion);
+
+ unlockClosure(ioport, info);
+ return (1);
+}
+/* -----------------------------------------------------------------------------
+ IOPort primitives
+ -------------------------------------------------------------------------- */
+
+stg_newIOPortzh ( gcptr init )
+{
+ W_ ioport;
+
+ ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newIOPortzh);
+
+ ioport = Hp - SIZEOF_StgMVar + WDS(1);
+ SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS);
+ // MVARs start dirty: generation 0 has no mutable list
+ StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
+ StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure;
+ return (ioport);
+}
+
+/* -----------------------------------------------------------------------------
Stable pointer primitives
------------------------------------------------------------------------- */
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index e8a6a81747..719c05435d 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -174,11 +174,11 @@ throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception)
- or it is masking exceptions (TSO_BLOCKEX)
- Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
- BlockedOnBlackHole then we acquire ownership of the TSO by locking
- its parent container (e.g. the MVar) and then raise the exception.
- We might change these cases to be more message-passing-like in the
- future.
+ Currently, if the target is BlockedOnMVar, BlockedOnSTM,
+ BlockedOnIOCompletion or BlockedOnBlackHole then we acquire ownership of the
+ TSO by locking its parent container (e.g. the MVar) and then raise the
+ exception. We might change these cases to be more message-passing-like in
+ the future.
Returns:
@@ -343,6 +343,7 @@ check_target:
case BlockedOnMVar:
case BlockedOnMVarRead:
+ case BlockedOnIOCompletion:
{
/*
To establish ownership of this TSO, we need to acquire a
@@ -367,7 +368,9 @@ check_target:
// we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
- if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
+ if ((target->why_blocked != BlockedOnMVar
+ && target->why_blocked != BlockedOnMVarRead
+ && target->why_blocked != BlockedOnIOCompletion)
|| (StgMVar *)target->block_info.closure != mvar) {
unlockClosure((StgClosure *)mvar, info);
goto retry;
@@ -679,6 +682,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
case BlockedOnMVar:
case BlockedOnMVarRead:
+ case BlockedOnIOCompletion:
removeFromMVarBlockedQueue(tso);
goto done;
diff --git a/rts/RtsSymbols.c b/rts/RtsSymbols.c
index ff32932ea9..0f0bd56c82 100644
--- a/rts/RtsSymbols.c
+++ b/rts/RtsSymbols.c
@@ -706,6 +706,9 @@
SymI_HasProto(stg_newMVarzh) \
SymI_HasProto(stg_newMutVarzh) \
SymI_HasProto(stg_newTVarzh) \
+ SymI_HasProto(stg_readIOPortzh) \
+ SymI_HasProto(stg_writeIOPortzh) \
+ SymI_HasProto(stg_newIOPortzh) \
SymI_HasProto(stg_noDuplicatezh) \
SymI_HasProto(stg_atomicModifyMutVar2zh) \
SymI_HasProto(stg_atomicModifyMutVarzuzh) \
diff --git a/rts/Schedule.c b/rts/Schedule.c
index ce1a1fc060..fab357aa06 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -198,6 +198,7 @@ schedule (Capability *initialCapability, Task *task)
bool ready_to_gc;
cap = initialCapability;
+ t = NULL;
// Pre-condition: this task owns initialCapability.
// The sched_mutex is *NOT* held
@@ -301,8 +302,13 @@ schedule (Capability *initialCapability, Task *task)
// Additionally, it is not fatal for the
// threaded RTS to reach here with no threads to run.
//
+ // Since IOPorts have no deadlock avoidance guarantees you may also reach
+ // this point when blocked on an IO Port. If this is the case the only
+ // thing that could unblock it is an I/O event.
+ //
// win32: might be here due to awaitEvent() being abandoned
- // as a result of a console event having been delivered.
+ // as a result of a console event having been delivered or as a result of
+ // waiting on an async I/O to complete with WinIO.
#if defined(THREADED_RTS)
scheduleYield(&cap,task);
@@ -310,9 +316,23 @@ schedule (Capability *initialCapability, Task *task)
if (emptyRunQueue(cap)) continue; // look for work again
#endif
-#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+#if !defined(THREADED_RTS)
if ( emptyRunQueue(cap) ) {
+ /* On the non-threaded RTS if the queue is empty and the last action was
+ blocked on an I/O completion port, then just wait till we're woken
+ up by the RTS with more work. */
+ if (t && t->why_blocked == BlockedOnIOCompletion)
+ {
+ fprintf (stderr, "waiting: %d.\n", t->why_blocked);
+ awaitEvent (emptyRunQueue(cap));
+ fprintf (stderr, "running: %d.\n", t->why_blocked);
+ continue;
+ }
+ continue;
+
+#if !defined(mingw32_HOST_OS)
ASSERT(sched_state >= SCHED_INTERRUPTING);
+#endif
}
#endif
@@ -928,6 +948,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
*/
if (recent_activity != ACTIVITY_INACTIVE) return;
#endif
+ if (task->incall->tso->why_blocked == BlockedOnIOCompletion) return;
debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
@@ -980,6 +1001,10 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
+ case BlockedOnIOCompletion:
+ /* We're blocked waiting for an external I/O call, let's just
+ chill for a bit. */
+ return;
default:
barf("deadlock: main thread blocked in a strange way");
}
@@ -2676,9 +2701,10 @@ initScheduler(void)
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
-#if defined(THREADED_RTS)
+
/* Initialise the mutex and condition variables used by
* the scheduler. */
+#if defined(THREADED_RTS)
initMutex(&sched_mutex);
initMutex(&sync_finished_mutex);
initCondition(&sync_finished_cond);
@@ -3164,6 +3190,11 @@ resurrectThreads (StgTSO *threads)
throwToSingleThreaded(cap, tso,
(StgClosure *)blockedIndefinitelyOnSTM_closure);
break;
+ case BlockedOnIOCompletion:
+ /* I/O Ports may not be reachable by the GC as they may be getting
+ * notified by the RTS. As such this call should be treated as if
+ * it is masking the exception. */
+ continue;
case NotBlocked:
/* This might happen if the thread was blocked on a black hole
* belonging to a thread that we've just woken up (raiseAsync
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 2d8d813464..89ab6e0b4c 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -176,7 +176,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso)
INLINE_HEADER StgTSO *
popRunQueue (Capability *cap)
{
- ASSERT(cap->n_run_queue != 0);
+ ASSERT(cap->n_run_queue > 0);
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
diff --git a/rts/Threads.c b/rts/Threads.c
index 22d58bb48b..54c703963e 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -288,6 +288,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
+ case BlockedOnIOCompletion:
case BlockedOnMVar:
case BlockedOnMVarRead:
{
@@ -868,12 +869,16 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
#endif
+ break;
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnMVarRead:
debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
break;
+ case BlockedOnIOCompletion:
+ debugBelch("is blocked on I/O Completion port @ %p", tso->block_info.closure);
+ break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
diff --git a/rts/Trace.c b/rts/Trace.c
index b35be3c1e7..7a1f0df768 100644
--- a/rts/Trace.c
+++ b/rts/Trace.c
@@ -164,6 +164,7 @@ static char *thread_stop_reasons[] = {
[6 + BlockedOnSTM] = "blocked on STM",
[6 + BlockedOnDoProc] = "blocked on asyncDoProc",
[6 + BlockedOnCCall] = "blocked on a foreign call",
+ [6 + BlockedOnIOCompletion] = "blocked on I/O Completion port",
[6 + BlockedOnCCall_Interruptible] = "blocked on a foreign call (interruptible)",
[6 + BlockedOnMsgThrowTo] = "blocked on throwTo",
[6 + ThreadMigrating] = "migrating"
diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c
index 8bf58c11ee..636737aa0f 100644
--- a/rts/TraverseHeap.c
+++ b/rts/TraverseHeap.c
@@ -1250,6 +1250,7 @@ inner_loop:
traversePushClosure(ts, (StgClosure *) tso->trec, c, child_data);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
) {
diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
index 5031c535a1..b1250b77e0 100644
--- a/rts/sm/Compact.c
+++ b/rts/sm/Compact.c
@@ -461,6 +461,7 @@ thread_TSO (StgTSO *tso)
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == NotBlocked
) {
thread_(&tso->block_info.closure);
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index ea64483418..2329b02016 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -618,6 +618,7 @@ checkTSO(StgTSO *tso)
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == NotBlocked
) {
ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure));
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 501d958aae..dd9a96adf8 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -129,6 +129,7 @@ scavengeTSO (StgTSO *tso)
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == BlockedOnIOCompletion
|| tso->why_blocked == NotBlocked
) {
evacuate(&tso->block_info.closure);
diff --git a/utils/genprimopcode/Main.hs b/utils/genprimopcode/Main.hs
index 31d363c0fa..3fe744fec3 100644
--- a/utils/genprimopcode/Main.hs
+++ b/utils/genprimopcode/Main.hs
@@ -919,6 +919,8 @@ ppType (TyApp (TyCon "StableName#") [x]) = "mkStableNamePrimTy " ++ ppType x
ppType (TyApp (TyCon "MVar#") [x,y]) = "mkMVarPrimTy " ++ ppType x
++ " " ++ ppType y
+ppType (TyApp (TyCon "IOPort#") [x,y]) = "mkIOPortPrimTy " ++ ppType x
+ ++ " " ++ ppType y
ppType (TyApp (TyCon "TVar#") [x,y]) = "mkTVarPrimTy " ++ ppType x
++ " " ++ ppType y
ppType (TyApp (TyCon "Void#") []) = "voidPrimTy"