summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2019-09-30 16:04:17 +0000
committerBen Gamari <ben@smart-cactus.org>2020-11-08 09:41:59 -0500
commit1061b8e9f512411fffe33e6426982253bdb8aee7 (patch)
tree782e275188e799c3e9ec96009d08fe8d96a72e1c
parent5c1e80156543cdbb140d23411ccb34672bfbc9e5 (diff)
downloadhaskell-1061b8e9f512411fffe33e6426982253bdb8aee7.tar.gz
rts/STM: Use atomics
This fixes a potentially harmful race where we failed to synchronize before looking at a TVar's current_value. Also did a bit of refactoring to avoid abstract over management of max_commits.
-rw-r--r--rts/STM.c72
1 files changed, 45 insertions, 27 deletions
diff --git a/rts/STM.c b/rts/STM.c
index cff0d55082..31a912e154 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -210,7 +210,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
StgClosure *expected) {
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
return (result == expected);
}
@@ -231,7 +231,7 @@ static void lock_stm(StgTRecHeader *trec) {
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
TRACE("%p : unlock_stm()", trec);
ASSERT(smp_locked == trec);
- smp_locked = 0;
+ SEQ_CST_STORE(&smp_locked, 0);
}
static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
@@ -240,7 +240,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
ASSERT(smp_locked == trec);
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
return result;
}
@@ -253,7 +253,7 @@ static void *unlock_tvar(Capability *cap,
ASSERT(smp_locked == trec);
if (force_update) {
StgClosure *old_value = s -> current_value;
- s -> current_value = c;
+ RELAXED_STORE(&s->current_value, c);
dirty_TVAR(cap, s, old_value);
}
}
@@ -265,7 +265,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
ASSERT(smp_locked == trec);
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
TRACE("%p : %d", result ? "success" : "failure");
return (result == expected);
}
@@ -292,7 +292,7 @@ static StgClosure *lock_tvar(Capability *cap,
TRACE("%p : lock_tvar(%p)", trec, s);
do {
do {
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
} while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
} while (cas((void *)&(s -> current_value),
(StgWord)result, (StgWord)trec) != (StgWord)result);
@@ -311,8 +311,8 @@ static void unlock_tvar(Capability *cap,
StgClosure *c,
StgBool force_update STG_UNUSED) {
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
- ASSERT(s -> current_value == (StgClosure *)trec);
- s -> current_value = c;
+ ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec);
+ RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, (StgClosure *) trec);
}
@@ -532,8 +532,8 @@ static void build_watch_queue_entries_for_trec(Capability *cap,
StgTVarWatchQueue *fq;
s = e -> tvar;
TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
- ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
- NACQ_ASSERT(s -> current_value == e -> expected_value);
+ ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec);
+ NACQ_ASSERT(RELAXED_LOAD(&s->current_value) == e -> expected_value);
fq = s -> first_watch_queue_entry;
q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
q -> next_queue_entry = fq;
@@ -569,7 +569,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
trec,
q -> closure,
s);
- ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
+ ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec);
nq = q -> next_queue_entry;
pq = q -> prev_queue_entry;
if (nq != END_STM_WATCH_QUEUE) {
@@ -727,7 +727,7 @@ static StgBool entry_is_read_only(TRecEntry *e) {
static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
StgClosure *c;
StgBool result;
- c = s -> current_value;
+ c = RELAXED_LOAD(&s->current_value);
result = (c == (StgClosure *) h);
return result;
}
@@ -800,13 +800,16 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
ASSERT(config_use_read_phase);
IF_STM_FG_LOCKS({
TRACE("%p : will need to check %p", trec, s);
- if (s -> current_value != e -> expected_value) {
+ // The memory ordering here must ensure that we have two distinct
+ // reads to current_value, with the read from num_updates between
+ // them.
+ if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match", trec);
result = false;
BREAK_FOR_EACH;
}
- e -> num_updates = s -> num_updates;
- if (s -> current_value != e -> expected_value) {
+ e->num_updates = SEQ_CST_LOAD(&s->num_updates);
+ if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match (race)", trec);
result = false;
BREAK_FOR_EACH;
@@ -828,7 +831,7 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
// check_read_only : check that we've seen an atomic snapshot of the
// non-updated TVars accessed by a trec. This checks that the last TRec to
// commit an update to the TVar is unchanged since the value was stashed in
-// validate_and_acquire_ownership. If no update is seen to any TVar than
+// validate_and_acquire_ownership. If no update is seen to any TVar then
// all of them contained their expected values at the start of the call to
// check_read_only.
//
@@ -847,11 +850,16 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
if (entry_is_read_only(e)) {
TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
+ // We must first load current_value then num_updates; this is inverse of
+ // the order of the stores in stmCommitTransaction.
+ StgClosure *current_value = SEQ_CST_LOAD(&s->current_value);
+ StgInt num_updates = SEQ_CST_LOAD(&s->num_updates);
+
// Note we need both checks and in this order as the TVar could be
// locked by another transaction that is committing but has not yet
// incremented `num_updates` (See #7815).
- if (s -> current_value != e -> expected_value ||
- s -> num_updates != e -> num_updates) {
+ if (current_value != e->expected_value ||
+ num_updates != e->num_updates) {
TRACE("%p : mismatch", trec);
result = false;
BREAK_FOR_EACH;
@@ -887,17 +895,22 @@ void stmPreGCHook (Capability *cap) {
#define TOKEN_BATCH_SIZE 1024
+#if defined(THREADED_RTS)
+
static volatile StgInt64 max_commits = 0;
-#if defined(THREADED_RTS)
static volatile StgWord token_locked = false;
+static StgInt64 getMaxCommits(void) {
+ return RELAXED_LOAD(&max_commits);
+}
+
static void getTokenBatch(Capability *cap) {
while (cas((void *)&token_locked, false, true) == true) { /* nothing */ }
- max_commits += TOKEN_BATCH_SIZE;
- TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
+ NONATOMIC_ADD(&max_commits, TOKEN_BATCH_SIZE);
+ TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, RELAXED_LOAD(&max_commits));
cap -> transaction_tokens = TOKEN_BATCH_SIZE;
- token_locked = false;
+ RELEASE_STORE(&token_locked, false);
}
static void getToken(Capability *cap) {
@@ -907,6 +920,10 @@ static void getToken(Capability *cap) {
cap -> transaction_tokens --;
}
#else
+static StgInt64 getMaxCommits(void) {
+ return 0;
+}
+
static void getToken(Capability *cap STG_UNUSED) {
// Nothing
}
@@ -1062,7 +1079,7 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade
/*......................................................................*/
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
- StgInt64 max_commits_at_start = max_commits;
+ StgInt64 max_commits_at_start = getMaxCommits();
TRACE("%p : stmCommitTransaction()", trec);
ASSERT(trec != NO_TREC);
@@ -1088,7 +1105,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
result = check_read_only(trec);
TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
- max_commits_at_end = max_commits;
+ max_commits_at_end = getMaxCommits();
max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
(n_capabilities * TOKEN_BATCH_SIZE));
if (((max_concurrent_commits >> 32) > 0) || shake()) {
@@ -1113,7 +1130,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
- s -> num_updates ++;
+ // We have locked the TVar therefore nonatomic addition is sufficient
+ NONATOMIC_ADD(&s->num_updates, 1);
});
unlock_tvar(cap, trec, s, e -> new_value, true);
}
@@ -1269,12 +1287,12 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) {
static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
StgClosure *result;
- result = tvar -> current_value;
+ result = ACQUIRE_LOAD(&tvar->current_value);
#if defined(STM_FG_LOCKS)
while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
- result = tvar -> current_value;
+ result = ACQUIRE_LOAD(&tvar->current_value);
}
#endif