diff options
Diffstat (limited to 'rts/PrimOps.cmm')
-rw-r--r-- | rts/PrimOps.cmm | 523 |
1 files changed, 300 insertions, 223 deletions
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index f4e80e9c35..ced15eec99 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -160,16 +160,16 @@ stg_newArrayzh ( W_ n /* words */, gcptr init ) p = arr + SIZEOF_StgMutArrPtrs; for: if (p < arr + WDS(words)) { - W_[p] = init; - p = p + WDS(1); - goto for; + W_[p] = init; + p = p + WDS(1); + goto for; } // Initialise the mark bits with 0 for2: if (p < arr + WDS(size)) { - W_[p] = 0; - p = p + WDS(1); - goto for2; + W_[p] = 0; + p = p + WDS(1); + goto for2; } return (arr); @@ -179,11 +179,11 @@ stg_unsafeThawArrayzh ( gcptr arr ) { // SUBTLETY TO DO WITH THE OLD GEN MUTABLE LIST // - // A MUT_ARR_PTRS lives on the mutable list, but a MUT_ARR_PTRS_FROZEN + // A MUT_ARR_PTRS lives on the mutable list, but a MUT_ARR_PTRS_FROZEN // normally doesn't. However, when we freeze a MUT_ARR_PTRS, we leave // it on the mutable list for the GC to remove (removing something from // the mutable list is not easy). - // + // // So that we can tell whether a MUT_ARR_PTRS_FROZEN is on the mutable list, // when we freeze it we set the info ptr to be MUT_ARR_PTRS_FROZEN0 // to indicate that it is still on the mutable list. @@ -198,11 +198,11 @@ stg_unsafeThawArrayzh ( gcptr arr ) if (StgHeader_info(arr) != stg_MUT_ARR_PTRS_FROZEN0_info) { SET_INFO(arr,stg_MUT_ARR_PTRS_DIRTY_info); recordMutable(arr); - // must be done after SET_INFO, because it ASSERTs closure_MUTABLE() - return (arr); + // must be done after SET_INFO, because it ASSERTs closure_MUTABLE() + return (arr); } else { - SET_INFO(arr,stg_MUT_ARR_PTRS_DIRTY_info); - return (arr); + SET_INFO(arr,stg_MUT_ARR_PTRS_DIRTY_info); + return (arr); } } @@ -229,16 +229,16 @@ stg_newArrayArrayzh ( W_ n /* words */ ) p = arr + SIZEOF_StgMutArrPtrs; for: if (p < arr + WDS(words)) { - W_[p] = arr; - p = p + WDS(1); - goto for; + W_[p] = arr; + p = p + WDS(1); + goto for; } // Initialise the mark bits with 0 for2: if (p < arr + WDS(size)) { - W_[p] = 0; - p = p + WDS(1); - goto for2; + W_[p] = 0; + p = p + WDS(1); + goto for2; } return (arr); @@ -258,7 +258,7 @@ stg_newMutVarzh ( gcptr init ) mv = Hp - SIZEOF_StgMutVar + WDS(1); SET_HDR(mv,stg_MUT_VAR_DIRTY_info,CCCS); StgMutVar_var(mv) = init; - + return (mv); } @@ -283,19 +283,19 @@ stg_atomicModifyMutVarzh ( gcptr mv, gcptr f ) { W_ z, x, y, r, h; - /* If x is the current contents of the MutVar#, then + /* If x is the current contents of the MutVar#, then We want to make the new contents point to (sel_0 (f x)) - + and the return value is - - (sel_1 (f x)) + + (sel_1 (f x)) obviously we can share (f x). z = [stg_ap_2 f x] (max (HS + 2) MIN_UPD_SIZE) - y = [stg_sel_0 z] (max (HS + 1) MIN_UPD_SIZE) + y = [stg_sel_0 z] (max (HS + 1) MIN_UPD_SIZE) r = [stg_sel_1 z] (max (HS + 1) MIN_UPD_SIZE) */ @@ -374,18 +374,14 @@ stg_mkWeakzh ( gcptr key, w = Hp - SIZEOF_StgWeak + WDS(1); SET_HDR(w, stg_WEAK_info, CCCS); - // We don't care about cfinalizer here. - // Should StgWeak_cfinalizer(w) be stg_NO_FINALIZER_closure or - // something else? - - StgWeak_key(w) = key; - StgWeak_value(w) = value; - StgWeak_finalizer(w) = finalizer; - StgWeak_cfinalizer(w) = stg_NO_FINALIZER_closure; + StgWeak_key(w) = key; + StgWeak_value(w) = value; + StgWeak_finalizer(w) = finalizer; + StgWeak_cfinalizers(w) = stg_NO_FINALIZER_closure; ACQUIRE_LOCK(sm_mutex); - StgWeak_link(w) = W_[weak_ptr_list]; - W_[weak_ptr_list] = w; + StgWeak_link(w) = generation_weak_ptr_list(W_[g0]); + generation_weak_ptr_list(W_[g0]) = w; RELEASE_LOCK(sm_mutex); IF_DEBUG(weak, ccall debugBelch(stg_weak_msg,w)); @@ -398,61 +394,62 @@ stg_mkWeakNoFinalizzerzh ( gcptr key, gcptr value ) jump stg_mkWeakzh (key, value, stg_NO_FINALIZER_closure); } -stg_mkWeakForeignEnvzh ( gcptr key, - gcptr val, - W_ fptr, // finalizer - W_ ptr, - W_ flag, // has environment (0 or 1) - W_ eptr ) +STRING(stg_cfinalizer_msg,"Adding a finalizer to %p\n") + +stg_addCFinalizzerToWeakzh ( W_ fptr, // finalizer + W_ ptr, + W_ flag, // has environment (0 or 1) + W_ eptr, + gcptr w ) { - W_ payload_words, words; - gcptr w, p; + W_ c, info; - ALLOC_PRIM (SIZEOF_StgWeak); + LOCK_CLOSURE(w, info); - w = Hp - SIZEOF_StgWeak + WDS(1); - SET_HDR(w, stg_WEAK_info, CCCS); + if (info == stg_DEAD_WEAK_info) { + // Already dead. + unlockClosure(w, info); + return (0); + } - payload_words = 4; - words = BYTES_TO_WDS(SIZEOF_StgArrWords) + payload_words; - ("ptr" p) = ccall allocate(MyCapability() "ptr", words); + ALLOC_PRIM (SIZEOF_StgCFinalizerList) - TICK_ALLOC_PRIM(SIZEOF_StgArrWords,WDS(payload_words),0); - SET_HDR(p, stg_ARR_WORDS_info, CCCS); + c = Hp - SIZEOF_StgCFinalizerList + WDS(1); + SET_HDR(c, stg_C_FINALIZER_LIST_info, CCCS); - StgArrWords_bytes(p) = WDS(payload_words); - StgArrWords_payload(p,0) = fptr; - StgArrWords_payload(p,1) = ptr; - StgArrWords_payload(p,2) = eptr; - StgArrWords_payload(p,3) = flag; + StgCFinalizerList_fptr(c) = fptr; + StgCFinalizerList_ptr(c) = ptr; + StgCFinalizerList_eptr(c) = eptr; + StgCFinalizerList_flag(c) = flag; - // We don't care about the value here. - // Should StgWeak_value(w) be stg_NO_FINALIZER_closure or something else? + StgCFinalizerList_link(c) = StgWeak_cfinalizers(w); + StgWeak_cfinalizers(w) = c; - StgWeak_key(w) = key; - StgWeak_value(w) = val; - StgWeak_finalizer(w) = stg_NO_FINALIZER_closure; - StgWeak_cfinalizer(w) = p; + unlockClosure(w, info); - ACQUIRE_LOCK(sm_mutex); - StgWeak_link(w) = W_[weak_ptr_list]; - W_[weak_ptr_list] = w; - RELEASE_LOCK(sm_mutex); + recordMutable(w); - IF_DEBUG(weak, ccall debugBelch(stg_weak_msg,w)); + IF_DEBUG(weak, ccall debugBelch(stg_cfinalizer_msg,w)); - return (w); + return (1); } stg_finalizzeWeakzh ( gcptr w ) { - gcptr f, arr; + gcptr f, list; + W_ info; + + LOCK_CLOSURE(w, info); // already dead? - if (GET_INFO(w) == stg_DEAD_WEAK_info) { + if (info == stg_DEAD_WEAK_info) { + unlockClosure(w, info); return (0,stg_NO_FINALIZER_closure); } + f = StgWeak_finalizer(w); + list = StgWeak_cfinalizers(w); + // kill it #ifdef PROFILING // @LDV profiling @@ -461,7 +458,7 @@ stg_finalizzeWeakzh ( gcptr w ) // LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)w); // or, LDV_recordDead(): // LDV_recordDead((StgClosure *)w, sizeofW(StgWeak) - sizeofW(StgProfHeader)); - // Furthermore, when PROFILING is turned on, dead weak pointers are exactly as + // Furthermore, when PROFILING is turned on, dead weak pointers are exactly as // large as weak pointers, so there is no need to fill the slop, either. // See stg_DEAD_WEAK_info in StgMiscClosures.hc. #endif @@ -469,19 +466,12 @@ stg_finalizzeWeakzh ( gcptr w ) // // Todo: maybe use SET_HDR() and remove LDV_recordCreate()? // - SET_INFO(w,stg_DEAD_WEAK_info); - LDV_RECORD_CREATE(w); - - f = StgWeak_finalizer(w); - arr = StgWeak_cfinalizer(w); + unlockClosure(w, stg_DEAD_WEAK_info); - StgDeadWeak_link(w) = StgWeak_link(w); + LDV_RECORD_CREATE(w); - if (arr != stg_NO_FINALIZER_closure) { - ccall runCFinalizer(StgArrWords_payload(arr,0), - StgArrWords_payload(arr,1), - StgArrWords_payload(arr,2), - StgArrWords_payload(arr,3)); + if (list != stg_NO_FINALIZER_closure) { + ccall runCFinalizers(list); } /* return the finalizer */ @@ -494,10 +484,21 @@ stg_finalizzeWeakzh ( gcptr w ) stg_deRefWeakzh ( gcptr w ) { - W_ code; + W_ code, info; gcptr val; - if (GET_INFO(w) == stg_WEAK_info) { + info = GET_INFO(w); + + if (info == stg_WHITEHOLE_info) { + // w is locked by another thread. Now it's not immediately clear if w is + // alive or not. We use lockClosure to wait for the info pointer to become + // something other than stg_WHITEHOLE_info. + + LOCK_CLOSURE(w, info); + unlockClosure(w, info); + } + + if (info == stg_WEAK_info) { code = 1; val = StgWeak_value(w); } else { @@ -512,7 +513,7 @@ stg_deRefWeakzh ( gcptr w ) -------------------------------------------------------------------------- */ stg_decodeFloatzuIntzh ( F_ arg ) -{ +{ W_ p; W_ mp_tmp1; W_ mp_tmp_w; @@ -521,16 +522,16 @@ stg_decodeFloatzuIntzh ( F_ arg ) mp_tmp1 = Sp - WDS(1); mp_tmp_w = Sp - WDS(2); - + /* Perform the operation */ ccall __decodeFloat_Int(mp_tmp1 "ptr", mp_tmp_w "ptr", arg); - + /* returns: (Int# (mantissa), Int# (exponent)) */ return (W_[mp_tmp1], W_[mp_tmp_w]); } stg_decodeDoublezu2Intzh ( D_ arg ) -{ +{ W_ p; W_ mp_tmp1; W_ mp_tmp2; @@ -564,13 +565,13 @@ stg_forkzh ( gcptr closure ) gcptr threadid; - ("ptr" threadid) = ccall createIOThread( MyCapability() "ptr", - RtsFlags_GcFlags_initialStkSize(RtsFlags), + ("ptr" threadid) = ccall createIOThread( MyCapability() "ptr", + RtsFlags_GcFlags_initialStkSize(RtsFlags), closure "ptr"); /* start blocked if the current thread is blocked */ StgTSO_flags(threadid) = %lobits16( - TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(threadid)) | TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); ccall scheduleThread(MyCapability() "ptr", threadid "ptr"); @@ -578,7 +579,7 @@ stg_forkzh ( gcptr closure ) // context switch soon, but not immediately: we don't want every // forkIO to force a context-switch. Capability_context_switch(MyCapability()) = 1 :: CInt; - + return (threadid); } @@ -588,13 +589,13 @@ again: MAYBE_GC(again); gcptr threadid; - ("ptr" threadid) = ccall createIOThread( MyCapability() "ptr", - RtsFlags_GcFlags_initialStkSize(RtsFlags), + ("ptr" threadid) = ccall createIOThread( MyCapability() "ptr", + RtsFlags_GcFlags_initialStkSize(RtsFlags), closure "ptr"); /* start blocked if the current thread is blocked */ StgTSO_flags(threadid) = %lobits16( - TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(threadid)) | TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); ccall scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr"); @@ -602,7 +603,7 @@ again: MAYBE_GC(again); // context switch soon, but not immediately: we don't want every // forkIO to force a context-switch. Capability_context_switch(MyCapability()) = 1 :: CInt; - + return (threadid); } @@ -1014,7 +1015,7 @@ retry_pop_stack: } } - // We've reached the ATOMICALLY_FRAME: attempt to wait + // We've reached the ATOMICALLY_FRAME: attempt to wait ASSERT(frame_type == ATOMICALLY_FRAME); if (outer != NO_TREC) { // We called retry while checking invariants, so abort the current @@ -1152,9 +1153,9 @@ stg_writeTVarzh (P_ tvar, /* :: TVar a */ stg_isEmptyMVarzh ( P_ mvar /* :: MVar a */ ) { if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { - return (1); + return (1); } else { - return (0); + return (0); } } @@ -1163,7 +1164,7 @@ stg_newMVarzh () W_ mvar; ALLOC_PRIM_ (SIZEOF_StgMVar, stg_newMVarzh); - + mvar = Hp - SIZEOF_StgMVar + WDS(1); SET_HDR(mvar,stg_MVAR_DIRTY_info,CCCS); // MVARs start dirty: generation 0 has no mutable list @@ -1191,21 +1192,16 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) { W_ val, info, tso, q; -#if defined(THREADED_RTS) - ("ptr" info) = ccall lockClosure(mvar "ptr"); -#else - info = GET_INFO(mvar); -#endif - - if (info == stg_MVAR_CLEAN_info) { - ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); - } + LOCK_CLOSURE(mvar, info); /* If the MVar is empty, put ourselves on its blocking queue, * and wait until we're woken up. */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { - + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + // We want to put the heap check down here in the slow path, // but be careful to unlock the closure before returning to // the RTS if the check fails. @@ -1220,30 +1216,32 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) StgMVarTSOQueue_link(q) = END_TSO_QUEUE; StgMVarTSOQueue_tso(q) = CurrentTSO; - if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { - StgMVar_head(mvar) = q; - } else { + if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_head(mvar) = q; + } else { StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; ccall recordClosureMutated(MyCapability() "ptr", StgMVar_tail(mvar)); - } - StgTSO__link(CurrentTSO) = q; - StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; - StgMVar_tail(mvar) = q; - + } + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = mvar; + StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + StgMVar_tail(mvar) = q; + jump stg_block_takemvar(mvar); } - + /* we got the value... */ val = StgMVar_value(mvar); - + q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; - unlockClosure(mvar, stg_MVAR_DIRTY_info); + // If the MVar is not already dirty, then we don't need to make + // it dirty, as it is empty with nothing blocking on it. + unlockClosure(mvar, info); return (val); } if (StgHeader_info(q) == stg_IND_info || @@ -1251,9 +1249,13 @@ loop: q = StgInd_indirectee(q); goto loop; } - + // There are putMVar(s) waiting... wake up the first thread on the queue - + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { @@ -1270,11 +1272,11 @@ loop: // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; - + // no need to mark the TSO dirty, we have only written END_TSO_QUEUE. ccall tryWakeupThread(MyCapability() "ptr", tso); - + unlockClosure(mvar, stg_MVAR_DIRTY_info); return (val); } @@ -1283,48 +1285,43 @@ stg_tryTakeMVarzh ( P_ mvar /* :: MVar a */ ) { W_ val, info, tso, q; -#if defined(THREADED_RTS) - ("ptr" info) = ccall lockClosure(mvar "ptr"); -#else - info = GET_INFO(mvar); -#endif - - /* If the MVar is empty, put ourselves on its blocking queue, - * and wait until we're woken up. - */ + LOCK_CLOSURE(mvar, info); + + /* If the MVar is empty, return 0. */ if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { #if defined(THREADED_RTS) unlockClosure(mvar, info); #endif - /* HACK: we need a pointer to pass back, - * so we abuse NO_FINALIZER_closure - */ - return (0, stg_NO_FINALIZER_closure); - } - - if (info == stg_MVAR_CLEAN_info) { - ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + /* HACK: we need a pointer to pass back, + * so we abuse NO_FINALIZER_closure + */ + return (0, stg_NO_FINALIZER_closure); } /* we got the value... */ val = StgMVar_value(mvar); - + q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; - unlockClosure(mvar, stg_MVAR_DIRTY_info); + unlockClosure(mvar, info); return (1, val); } + if (StgHeader_info(q) == stg_IND_info || StgHeader_info(q) == stg_MSG_NULL_info) { q = StgInd_indirectee(q); goto loop; } - + // There are putMVar(s) waiting... wake up the first thread on the queue - + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { @@ -1341,11 +1338,11 @@ loop: // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; - + // no need to mark the TSO dirty, we have only written END_TSO_QUEUE. ccall tryWakeupThread(MyCapability() "ptr", tso); - + unlockClosure(mvar, stg_MVAR_DIRTY_info); return (1,val); } @@ -1355,18 +1352,14 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */ { W_ info, tso, q; -#if defined(THREADED_RTS) - ("ptr" info) = ccall lockClosure(mvar "ptr"); -#else - info = GET_INFO(mvar); -#endif - - if (info == stg_MVAR_CLEAN_info) { - ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); - } + LOCK_CLOSURE(mvar, info); if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) { + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + // We want to put the heap check down here in the slow path, // but be careful to unlock the closure before returning to // the RTS if the check fails. @@ -1381,27 +1374,30 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */ StgMVarTSOQueue_link(q) = END_TSO_QUEUE; StgMVarTSOQueue_tso(q) = CurrentTSO; - if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { - StgMVar_head(mvar) = q; - } else { + if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_head(mvar) = q; + } else { StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; ccall recordClosureMutated(MyCapability() "ptr", StgMVar_tail(mvar)); - } - StgTSO__link(CurrentTSO) = q; - StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; - StgMVar_tail(mvar) = q; + } + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = mvar; + StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + StgMVar_tail(mvar) = q; jump stg_block_putmvar(mvar,val); } - + q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { - /* No further takes, the MVar is now full. */ - StgMVar_value(mvar) = val; - unlockClosure(mvar, stg_MVAR_DIRTY_info); + /* No further takes, the MVar is now full. */ + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + StgMVar_value(mvar) = val; + unlockClosure(mvar, stg_MVAR_DIRTY_info); return (); } if (StgHeader_info(q) == stg_IND_info || @@ -1410,16 +1406,19 @@ loop: goto loop; } - // There are takeMVar(s) waiting: wake up the first one - + // There are readMVar/takeMVar(s) waiting: wake up the first one + tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; @@ -1432,10 +1431,19 @@ loop: if (TO_W_(StgStack_dirty(stack)) == 0) { ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); } - + ccall tryWakeupThread(MyCapability() "ptr", tso); - unlockClosure(mvar, stg_MVAR_DIRTY_info); + // If it was an readMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + + unlockClosure(mvar, info); return (); } @@ -1445,29 +1453,25 @@ stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */ { W_ info, tso, q; -#if defined(THREADED_RTS) - ("ptr" info) = ccall lockClosure(mvar "ptr"); -#else - info = GET_INFO(mvar); -#endif + LOCK_CLOSURE(mvar, info); if (StgMVar_value(mvar) != stg_END_TSO_QUEUE_closure) { #if defined(THREADED_RTS) - unlockClosure(mvar, info); + unlockClosure(mvar, info); #endif - return (0); - } - - if (info == stg_MVAR_CLEAN_info) { - ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + return (0); } q = StgMVar_head(mvar); loop: if (q == stg_END_TSO_QUEUE_closure) { - /* No further takes, the MVar is now full. */ - StgMVar_value(mvar) = val; - unlockClosure(mvar, stg_MVAR_DIRTY_info); + /* No further takes, the MVar is now full. */ + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + + StgMVar_value(mvar) = val; + unlockClosure(mvar, stg_MVAR_DIRTY_info); return (1); } if (StgHeader_info(q) == stg_IND_info || @@ -1477,15 +1481,18 @@ loop: } // There are takeMVar(s) waiting: wake up the first one - + tso = StgMVarTSOQueue_tso(q); StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // save why_blocked here, because waking up the thread destroys + // this information + W_ why_blocked; + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // actually perform the takeMVar W_ stack; @@ -1494,17 +1501,87 @@ loop: // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; - + if (TO_W_(StgStack_dirty(stack)) == 0) { ccall dirty_STACK(MyCapability() "ptr", stack "ptr"); } - + ccall tryWakeupThread(MyCapability() "ptr", tso); - unlockClosure(mvar, stg_MVAR_DIRTY_info); + // If it was an readMVar, then we can still do work, + // so loop back. (XXX: This could take a while) + if (why_blocked == BlockedOnMVarRead) { + q = StgMVarTSOQueue_link(q); + goto loop; + } + + ASSERT(why_blocked == BlockedOnMVar); + + unlockClosure(mvar, info); return (1); } +stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) +{ + W_ val, info, tso, q; + + LOCK_CLOSURE(mvar, info); + + /* If the MVar is empty, put ourselves on the blocked readers + * list and wait until we're woken up. + */ + if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { + + if (info == stg_MVAR_CLEAN_info) { + ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); + } + + ALLOC_PRIM_WITH_CUSTOM_FAILURE + (SIZEOF_StgMVarTSOQueue, + unlockClosure(mvar, stg_MVAR_DIRTY_info); + GC_PRIM_P(stg_readMVarzh, mvar)); + + q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); + + // readMVars are pushed to the front of the queue, so + // they get handled immediately + SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); + StgMVarTSOQueue_link(q) = StgMVar_head(mvar); + StgMVarTSOQueue_tso(q) = CurrentTSO; + + StgTSO__link(CurrentTSO) = q; + StgTSO_block_info(CurrentTSO) = mvar; + StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; + StgMVar_head(mvar) = q; + + if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(mvar) = q; + } + + jump stg_block_readmvar(mvar); + } + + val = StgMVar_value(mvar); + + unlockClosure(mvar, info); + return (val); +} + +stg_tryReadMVarzh ( P_ mvar, /* :: MVar a */ ) +{ + W_ val, info, tso, q; + + LOCK_CLOSURE(mvar, info); + + if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { + return (0, stg_NO_FINALIZER_closure); + } + + val = StgMVar_value(mvar); + + unlockClosure(mvar, info); + return (1, val); +} /* ----------------------------------------------------------------------------- Stable pointer primitives @@ -1566,23 +1643,23 @@ stg_newBCOzh ( P_ instrs, bco = Hp - bytes + WDS(1); SET_HDR(bco, stg_BCO_info, CCCS); - + StgBCO_instrs(bco) = instrs; StgBCO_literals(bco) = literals; StgBCO_ptrs(bco) = ptrs; StgBCO_arity(bco) = HALF_W_(arity); StgBCO_size(bco) = HALF_W_(words); - + // Copy the arity/bitmap info into the BCO W_ i; i = 0; for: if (i < BYTE_ARR_WDS(bitmap_arr)) { - StgBCO_bitmap(bco,i) = StgArrWords_payload(bitmap_arr,i); - i = i + 1; - goto for; + StgBCO_bitmap(bco,i) = StgArrWords_payload(bitmap_arr,i); + i = i + 1; + goto for; } - + return (bco); } @@ -1602,10 +1679,10 @@ stg_mkApUpd0zh ( P_ bco ) ap = Hp - SIZEOF_StgAP + WDS(1); SET_HDR(ap, stg_AP_info, CCCS); - + StgAP_n_args(ap) = HALF_W_(0); StgAP_fun(ap) = bco; - + return (ap); } @@ -1625,14 +1702,14 @@ stg_unpackClosurezh ( P_ closure ) nptrs = 0; goto out; } - case THUNK, THUNK_1_0, THUNK_0_1, THUNK_2_0, THUNK_1_1, + case THUNK, THUNK_1_0, THUNK_0_1, THUNK_2_0, THUNK_1_1, THUNK_0_2, THUNK_STATIC, AP, PAP, AP_STACK, BCO : { ptrs = 0; nptrs = 0; goto out; } default: { - ptrs = TO_W_(%INFO_PTRS(info)); + ptrs = TO_W_(%INFO_PTRS(info)); nptrs = TO_W_(%INFO_NPTRS(info)); goto out; }} @@ -1658,22 +1735,22 @@ out: p = 0; for: if(p < ptrs) { - W_[ptrs_arr + SIZEOF_StgMutArrPtrs + WDS(p)] = StgClosure_payload(clos,p); - p = p + 1; - goto for; + W_[ptrs_arr + SIZEOF_StgMutArrPtrs + WDS(p)] = StgClosure_payload(clos,p); + p = p + 1; + goto for; } /* We can leave the card table uninitialised, since the array is allocated in the nursery. The GC will fill it in if/when the array is promoted. */ - + SET_HDR(nptrs_arr, stg_ARR_WORDS_info, CCCS); StgArrWords_bytes(nptrs_arr) = WDS(nptrs); p = 0; for2: if(p < nptrs) { - W_[BYTE_ARR_CTS(nptrs_arr) + WDS(p)] = StgClosure_payload(clos, p+ptrs); - p = p + 1; - goto for2; + W_[BYTE_ARR_CTS(nptrs_arr) + WDS(p)] = StgClosure_payload(clos, p+ptrs); + p = p + 1; + goto for2; } return (info, ptrs_arr, nptrs_arr); } @@ -1685,13 +1762,13 @@ for2: /* Add a thread to the end of the blocked queue. (C-- version of the C * macro in Schedule.h). */ -#define APPEND_TO_BLOCKED_QUEUE(tso) \ - ASSERT(StgTSO__link(tso) == END_TSO_QUEUE); \ - if (W_[blocked_queue_hd] == END_TSO_QUEUE) { \ - W_[blocked_queue_hd] = tso; \ - } else { \ +#define APPEND_TO_BLOCKED_QUEUE(tso) \ + ASSERT(StgTSO__link(tso) == END_TSO_QUEUE); \ + if (W_[blocked_queue_hd] == END_TSO_QUEUE) { \ + W_[blocked_queue_hd] = tso; \ + } else { \ ccall setTSOLink(MyCapability() "ptr", W_[blocked_queue_tl] "ptr", tso); \ - } \ + } \ W_[blocked_queue_tl] = tso; stg_waitReadzh ( W_ fd ) @@ -1748,7 +1825,7 @@ stg_delayzh ( W_ us_delay ) /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, - stg_delayzh_malloc_str); + stg_delayzh_malloc_str); (reqID) = ccall addDelayRequest(us_delay); StgAsyncIOResult_reqID(ares) = reqID; StgAsyncIOResult_len(ares) = 0; @@ -1775,14 +1852,14 @@ stg_delayzh ( W_ us_delay ) t = W_[sleeping_queue]; while: if (t != END_TSO_QUEUE && StgTSO_block_info(t) < target) { - prev = t; - t = StgTSO__link(t); - goto while; + prev = t; + t = StgTSO__link(t); + goto while; } StgTSO__link(CurrentTSO) = t; if (prev == NULL) { - W_[sleeping_queue] = CurrentTSO; + W_[sleeping_queue] = CurrentTSO; } else { ccall setTSOLink(MyCapability() "ptr", prev "ptr", CurrentTSO); } @@ -1896,7 +1973,7 @@ stg_asyncDoProczh ( W_ proc, W_ param ) * | -------+-----> A <-------+------- | * | update | BLACKHOLE | marked_update | * +-----------+ +---------------+ - * | | | | + * | | | | * ... ... * | | +---------------+ * +-----------+ @@ -1941,7 +2018,7 @@ stg_noDuplicatezh /* no arg list: explicit stack layout */ SAVE_THREAD_STATE(); ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); ccall threadPaused (MyCapability() "ptr", CurrentTSO "ptr"); - + if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) { jump stg_threadFinished []; } else { |