summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libraries/base/GHC/IOPort.hs43
-rw-r--r--rts/Prelude.h3
-rw-r--r--rts/PrimOps.cmm97
-rw-r--r--rts/package.conf.in4
-rw-r--r--rts/rts.cabal.in4
-rw-r--r--rts/win32/libHSbase.def3
6 files changed, 109 insertions, 45 deletions
diff --git a/libraries/base/GHC/IOPort.hs b/libraries/base/GHC/IOPort.hs
index e4890d0989..1905a10fc9 100644
--- a/libraries/base/GHC/IOPort.hs
+++ b/libraries/base/GHC/IOPort.hs
@@ -13,11 +13,13 @@
-- Stability : internal
-- Portability : non-portable (GHC Extensions)
--
--- The IOPort type. This is a synchronization primitive similar to IOVar but
--- without any of the deadlock guarantees that IOVar provides. The ports are
--- single write/multiple wait. Writing to an already full Port will not queue
--- the value but instead will discard it.
+-- The IOPort type. This is a facility used by the Windows IO subsystem.
+-- We have strict rules with an I/O Port:
+-- * writing more than once is an error
+-- * reading more than once is an error
--
+-- It gives us the ability to have one thread to block, wait for a result from
+-- another thread and then being woken up. *Nothing* more.
--
-----------------------------------------------------------------------------
@@ -28,20 +30,41 @@ module GHC.IOPort (
, newEmptyIOPort
, readIOPort
, writeIOPort
+ , doubleReadException
) where
import GHC.Base
+import GHC.IO.Exception
+import GHC.Exception
+import Text.Show
+
+data IOPortException = IOPortException deriving Show
+
+instance Exception IOPortException where
+ displayException IOPortException = "IOPortException"
+
+
+doubleReadException :: SomeException
+doubleReadException = toException IOPortException
data IOPort a = IOPort (IOPort# RealWorld a)
{- ^
An 'IOPort' is a synchronising variable, used
-for communication between concurrent threads, where it one of the threads is
+for communication between concurrent threads, where one of the threads is
controlled by an external state. e.g. by an I/O action that is serviced by the
runtime. It can be thought of as a box, which may be empty or full.
It is mostly similar to the behavior of MVar except writeIOPort doesn't block if
the variable is full and the GC won't forcibly release the lock if it thinks
there's a deadlock.
+
+The properties of IOPorts are:
+* Writing to an empty IOPort will not block.
+* Writing to an full IOPort will not block and throw an exception.
+* Reading from an IOPort for the second time will throw an exception.
+* Reading from a full IOPort will not block, return the value and empty the port.
+* Reading from an empty IOPort will block until a write.
+
-}
-- | @since 4.1.0.0
@@ -49,15 +72,7 @@ instance Eq (IOPort a) where
(IOPort ioport1#) == (IOPort ioport2#) =
isTrue# (sameIOPort# ioport1# ioport2#)
-{-
-M-Vars are rendezvous points for concurrent threads. They begin
-empty, and any attempt to read an empty M-Var blocks. When an M-Var
-is written, a single blocked thread may be freed. Reading an M-Var
-toggles its state from full back to empty. Therefore, any value
-written to an M-Var may only be read once. Multiple reads and writes
-are allowed, but there must be at least one read between any two
-writes.
--}
+
-- |Create an 'IOPort' which is initially empty.
newEmptyIOPort :: IO (IOPort a)
diff --git a/rts/Prelude.h b/rts/Prelude.h
index 0527218da0..d2511b2fc3 100644
--- a/rts/Prelude.h
+++ b/rts/Prelude.h
@@ -43,6 +43,7 @@ PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure);
PRELUDE_CLOSURE(base_GHCziIOziException_cannotCompactFunction_closure);
PRELUDE_CLOSURE(base_GHCziIOziException_cannotCompactPinned_closure);
PRELUDE_CLOSURE(base_GHCziIOziException_cannotCompactMutable_closure);
+PRELUDE_CLOSURE(base_GHCziIOPort_doubleReadException_closure);
PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure);
PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure);
PRELUDE_CLOSURE(base_GHCziEventziThread_blockedOnBadFD_closure);
@@ -109,6 +110,8 @@ PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
#define cannotCompactMutable_closure DLL_IMPORT_DATA_REF(base_GHCziIOziException_cannotCompactMutable_closure)
#define nonTermination_closure DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nonTermination_closure)
#define nestedAtomically_closure DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nestedAtomically_closure)
+#define doubleReadException DLL_IMPORT_DATA_REF(base_GHCziIOPort_doubleReadException_closure)
+
#define blockedOnBadFD_closure DLL_IMPORT_DATA_REF(base_GHCziEventziThread_blockedOnBadFD_closure)
#define Czh_con_info DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Czh_con_info)
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 0b1b1419a1..e1f4365963 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -31,6 +31,8 @@ import pthread_mutex_unlock;
#endif
import CLOSURE base_ControlziExceptionziBase_nestedAtomically_closure;
import CLOSURE base_GHCziIOziException_heapOverflow_closure;
+import CLOSURE base_GHCziIOziException_blockedIndefinitelyOnMVar_closure;
+import CLOSURE base_GHCziIOPort_doubleReadException_closure;
import AcquireSRWLockExclusive;
import ReleaseSRWLockExclusive;
import CLOSURE ghczmprim_GHCziTypes_False_closure;
@@ -1593,6 +1595,7 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ )
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
// Write barrier before we make the new MVAR_TSO_QUEUE
// visible to other cores.
+ // See Note [Heap memory barriers]
prim_write_barrier;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
@@ -1761,6 +1764,7 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ //See Note [Heap memory barriers]
prim_write_barrier;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
@@ -1943,6 +1947,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
*/
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
+ // Add MVar to mutable list
if (info == stg_MVAR_CLEAN_info) {
ccall dirty_MVAR(BaseReg "ptr", mvar "ptr", StgMVar_value(mvar));
}
@@ -1960,6 +1965,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ //See Note [Heap memory barriers]
prim_write_barrier;
StgTSO__link(CurrentTSO) = q;
@@ -2002,10 +2008,8 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ )
*
* readIOPort & writeIOPort work as follows. Firstly, an important invariant:
*
- * If the IOPort is full, then the request is silently dropped and the
- * message is lost. If the IOPort is empty then the
- * blocking queue contains only the thread blocked on IOPort. An IOPort only
- * supports a single read and a single write to it.
+ * Only one read and one write is allowed for an IOPort.
+ * Reading or writing to the same port twice will throw an exception.
*
* readIOPort:
* IOPort empty : then add ourselves to the blocking queue
@@ -2030,8 +2034,17 @@ stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ )
* be allowed to interrupt a blocked IOPort just because it thinks there's a
* deadlock. This is especially crucial for the non-threaded runtime.
*
+ * To avoid double reads/writes we set only the head when a reader queues up
+ * on a port. We set the tail to the port itself upon reading. We can do this
+ * since there can only be one reader/writer for the port. In contrast to MVars
+ * which do need to keep a list of blocked threads.
+ *
+ * This allows us to detect any double uses of IOPorts and throw an exception
+ * in that case for easier debugging.
+ *
* -------------------------------------------------------------------------- */
+
stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
{
W_ val, info, tso, q;
@@ -2043,8 +2056,14 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
*/
if (StgMVar_value(ioport) == stg_END_TSO_QUEUE_closure) {
+ // There is already another reader, throw exception.
+ if (StgMVar_head(ioport) != stg_END_TSO_QUEUE_closure) {
+ unlockClosure(ioport, info);
+ jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure);
+ }
+
if (info == stg_MVAR_CLEAN_info) {
- ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
+ ccall dirty_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport));
}
ALLOC_PRIM_WITH_CUSTOM_FAILURE
@@ -2056,20 +2075,31 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
// readIOPorts are pushed to the front of the queue, so
// they get handled immediately
- SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
StgMVarTSOQueue_link(q) = StgMVar_head(ioport);
StgMVarTSOQueue_tso(q) = CurrentTSO;
+ SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
+ //See Note [Heap memory barriers]
+ prim_write_barrier;
+
+ StgMVar_head(ioport) = q;
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = ioport;
StgTSO_why_blocked(CurrentTSO) = BlockedOnIOCompletion::I16;
- StgMVar_head(ioport) = q;
-
- if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(ioport) = q;
- }
+ //Unlocks the closure as well
jump stg_block_readmvar(ioport);
+
+ }
+
+ //Upon reading we set tail = ioport.
+ //This way we can check of there has been a read already.
+ if (StgMVar_tail(ioport) == stg_END_TSO_QUEUE_closure) {
+ StgMVar_tail(ioport) = ioport;
+ } else {
+ //Or another thread has read already: Throw an exception.
+ unlockClosure(ioport, info);
+ jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure);
}
val = StgMVar_value(ioport);
@@ -2085,23 +2115,31 @@ stg_writeIOPortzh ( P_ ioport, /* :: IOPort a */
LOCK_CLOSURE(ioport, info);
- /* If there is already a value in the queue, then silently ignore the
- second put. TODO: Correct usages of IOPort should never have a second
- put, so perhaps raise an error instead, but I have no idea how to do this
- safely and correctly at this point. */
+ /* If there is already a value in the port, then raise an exception
+ as it's the second write.
+ Correct usages of IOPort should never have a second
+ write. */
if (StgMVar_value(ioport) != stg_END_TSO_QUEUE_closure) {
unlockClosure(ioport, info);
+ jump stg_raiseIOzh(base_GHCziIOPort_doubleReadException_closure);
return (0);
}
+ // We are going to mutate the closure, make sure its current pointers
+ // are marked.
+ if (info == stg_MVAR_CLEAN_info) {
+ ccall update_MVAR(BaseReg "ptr", ioport "ptr", StgMVar_value(ioport) "ptr");
+ }
+
q = StgMVar_head(ioport);
loop:
if (q == stg_END_TSO_QUEUE_closure) {
- /* No further takes, the IOPort is now full. */
+ /* No takes, the IOPort is now full. */
if (info == stg_MVAR_CLEAN_info) {
ccall dirty_MVAR(BaseReg "ptr", ioport "ptr");
}
StgMVar_value(ioport) = val;
+
unlockClosure(ioport, stg_MVAR_DIRTY_info);
return (1);
}
@@ -2111,13 +2149,12 @@ loop:
goto loop;
}
- // There are readIOPort(s) waiting: wake up the first one
-
+ // There is a readIOPort waiting: wake it up
tso = StgMVarTSOQueue_tso(q);
- StgMVar_head(ioport) = StgMVarTSOQueue_link(q);
- if (StgMVar_head(ioport) == stg_END_TSO_QUEUE_closure) {
- StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
- }
+
+ // In contrast to MVars we do not need to move on to the
+ // next element in the waiting list here, as there can only ever
+ // be one thread blocked on a port.
ASSERT(StgTSO_block_info(tso) == ioport);
// save why_blocked here, because waking up the thread destroys
@@ -2130,7 +2167,7 @@ loop:
stack = StgTSO_stackobj(tso);
PerformTake(stack, val);
- // indicate that the MVar operation has now completed.
+ // indicate that the operation has now completed.
StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
if (TO_W_(StgStack_dirty(stack)) == 0) {
@@ -2139,13 +2176,12 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
- // If it was a readIOPort, then we can still do work,
- // so loop back. (XXX: This could take a while)
- if (why_blocked == BlockedOnIOCompletion) {
- q = StgMVarTSOQueue_link(q);
- goto loop;
- }
+ // For MVars we loop here, waking up all readers.
+ // IOPorts however can only have on reader. So we are done
+ // at this point.
+ //Either there was no reader queued, or he must have been
+ //blocked on BlockedOnIOCompletion
ASSERT(why_blocked == BlockedOnIOCompletion);
unlockClosure(ioport, info);
@@ -2163,10 +2199,11 @@ stg_newIOPortzh ( gcptr init )
ioport = Hp - SIZEOF_StgMVar + WDS(1);
SET_HDR(ioport, stg_MVAR_DIRTY_info,CCCS);
- // MVARs start dirty: generation 0 has no mutable list
+ // MVARs start dirty: generation 0 has no mutable list
StgMVar_head(ioport) = stg_END_TSO_QUEUE_closure;
StgMVar_tail(ioport) = stg_END_TSO_QUEUE_closure;
StgMVar_value(ioport) = stg_END_TSO_QUEUE_closure;
+
return (ioport);
}
diff --git a/rts/package.conf.in b/rts/package.conf.in
index a03eff6a14..d59d0ce641 100644
--- a/rts/package.conf.in
+++ b/rts/package.conf.in
@@ -97,6 +97,8 @@ ld-options:
, "-Wl,-u,_base_GHCziIOziException_cannotCompactFunction_closure"
, "-Wl,-u,_base_GHCziIOziException_cannotCompactPinned_closure"
, "-Wl,-u,_base_GHCziIOziException_cannotCompactMutable_closure"
+ , "-Wl,-u,_base_GHCziIOPort_doubleReadException_closure"
+ , "-Wl,-u,_base_ControlziExceptionziBase_absentSumFieldError_closure"
, "-Wl,-u,_base_ControlziExceptionziBase_nonTermination_closure"
, "-Wl,-u,_base_ControlziExceptionziBase_nestedAtomically_closure"
, "-Wl,-u,_base_GHCziEventziThread_blockedOnBadFD_closure"
@@ -210,6 +212,8 @@ ld-options:
, "-Wl,-u,base_GHCziIOziException_cannotCompactFunction_closure"
, "-Wl,-u,base_GHCziIOziException_cannotCompactPinned_closure"
, "-Wl,-u,base_GHCziIOziException_cannotCompactMutable_closure"
+ , "-Wl,-u,base_GHCziIOPort_doubleReadException_closure"
+ , "-Wl,-u,base_ControlziExceptionziBase_absentSumFieldError_closure"
, "-Wl,-u,base_ControlziExceptionziBase_nonTermination_closure"
, "-Wl,-u,base_ControlziExceptionziBase_nestedAtomically_closure"
, "-Wl,-u,base_GHCziEventziThread_blockedOnBadFD_closure"
diff --git a/rts/rts.cabal.in b/rts/rts.cabal.in
index a16582a7f5..a890d3dc1d 100644
--- a/rts/rts.cabal.in
+++ b/rts/rts.cabal.in
@@ -227,6 +227,8 @@ library
"-Wl,-u,_base_GHCziIOziException_cannotCompactFunction_closure"
"-Wl,-u,_base_GHCziIOziException_cannotCompactPinned_closure"
"-Wl,-u,_base_GHCziIOziException_cannotCompactMutable_closure"
+ "-Wl,-u,_base_GHCziIOPort_doubleReadException_closure"
+ "-Wl,-u,_base_ControlziExceptionziBase_absentSumFieldError_closure"
"-Wl,-u,_base_ControlziExceptionziBase_nonTermination_closure"
"-Wl,-u,_base_ControlziExceptionziBase_nestedAtomically_closure"
"-Wl,-u,_base_GHCziEventziThread_blockedOnBadFD_closure"
@@ -307,6 +309,8 @@ library
"-Wl,-u,base_GHCziIOziException_cannotCompactFunction_closure"
"-Wl,-u,base_GHCziIOziException_cannotCompactPinned_closure"
"-Wl,-u,base_GHCziIOziException_cannotCompactMutable_closure"
+ "-Wl,-u,base_GHCziIOPort_doubleReadException_closure"
+ "-Wl,-u,base_ControlziExceptionziBase_absentSumFieldError_closure"
"-Wl,-u,base_ControlziExceptionziBase_nonTermination_closure"
"-Wl,-u,base_ControlziExceptionziBase_nestedAtomically_closure"
"-Wl,-u,base_GHCziEventziThread_blockedOnBadFD_closure"
diff --git a/rts/win32/libHSbase.def b/rts/win32/libHSbase.def
index fb705bbd9f..733fffb5df 100644
--- a/rts/win32/libHSbase.def
+++ b/rts/win32/libHSbase.def
@@ -43,7 +43,8 @@ EXPORTS
base_GHCziIOziException_cannotCompactFunction_closure
base_GHCziIOziException_cannotCompactPinned_closure
base_GHCziIOziException_cannotCompactMutable_closure
-
+ base_GHCziIOPort_doubleReadException_closure
+ base_ControlziExceptionziBase_absentSumFieldError_closure
base_ControlziExceptionziBase_nonTermination_closure
base_ControlziExceptionziBase_nestedAtomically_closure
base_GHCziEventziThread_blockedOnBadFD_closure