diff options
author | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
---|---|---|
committer | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
commit | 1bf1084f2b10c3b47fd1a588d85d21ed0eb41d0c (patch) | |
tree | 46dcd36c86e7fbc6e5df36deb463b33e9967a6f7 /Source/WTF/wtf/ParkingLot.cpp | |
parent | 32761a6cee1d0dee366b885b7b9c777e67885688 (diff) | |
download | WebKitGtk-tarball-master.tar.gz |
webkitgtk-2.16.5HEADwebkitgtk-2.16.5master
Diffstat (limited to 'Source/WTF/wtf/ParkingLot.cpp')
-rw-r--r-- | Source/WTF/wtf/ParkingLot.cpp | 813 |
1 files changed, 813 insertions, 0 deletions
diff --git a/Source/WTF/wtf/ParkingLot.cpp b/Source/WTF/wtf/ParkingLot.cpp new file mode 100644 index 000000000..e6c678bef --- /dev/null +++ b/Source/WTF/wtf/ParkingLot.cpp @@ -0,0 +1,813 @@ +/* + * Copyright (C) 2015-2016 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "ParkingLot.h" + +#include "CurrentTime.h" +#include "DataLog.h" +#include "HashFunctions.h" +#include "StringPrintStream.h" +#include "ThreadSpecific.h" +#include "ThreadingPrimitives.h" +#include "Vector.h" +#include "WeakRandom.h" +#include "WordLock.h" +#include <condition_variable> +#include <mutex> +#include <thread> + +namespace WTF { + +namespace { + +const bool verbose = false; + +struct ThreadData : public ThreadSafeRefCounted<ThreadData> { + WTF_MAKE_FAST_ALLOCATED; +public: + + ThreadData(); + ~ThreadData(); + + ThreadIdentifier threadIdentifier; + + Mutex parkingLock; + ThreadCondition parkingCondition; + + const void* address { nullptr }; + + ThreadData* nextInQueue { nullptr }; + + intptr_t token { 0 }; +}; + +enum class DequeueResult { + Ignore, + RemoveAndContinue, + RemoveAndStop +}; + +struct Bucket { + WTF_MAKE_FAST_ALLOCATED; +public: + Bucket() + : random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock. + { + } + + void enqueue(ThreadData* data) + { + if (verbose) + dataLog(toString(currentThread(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n")); + ASSERT(data->address); + ASSERT(!data->nextInQueue); + + if (queueTail) { + queueTail->nextInQueue = data; + queueTail = data; + return; + } + + queueHead = data; + queueTail = data; + } + + template<typename Functor> + void genericDequeue(const Functor& functor) + { + if (verbose) + dataLog(toString(currentThread(), ": dequeueing from bucket at ", RawPointer(this), "\n")); + + if (!queueHead) { + if (verbose) + dataLog(toString(currentThread(), ": empty.\n")); + return; + } + + // This loop is a very clever abomination. The induction variables are the pointer to the + // pointer to the current node, and the pointer to the previous node. This gives us everything + // we need to both proceed forward to the next node, and to remove nodes while maintaining the + // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head + // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then + // we'd want queueTail to be set to nullptr. This works because: + // + // currentPtr == &queueHead + // previous == nullptr + // + // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer + // that used to point to this node to instead point to this node's successor. Another example: + // if we were at the second node in the queue, then we'd have: + // + // currentPtr == &queueHead->nextInQueue + // previous == queueHead + // + // If this node is not equal to queueTail, then removing it simply means making + // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm + // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set + // queueTail to previous, which in this case is queueHead - thus making the queue look like a + // proper one-element queue with queueHead == queueTail. + bool shouldContinue = true; + ThreadData** currentPtr = &queueHead; + ThreadData* previous = nullptr; + + double time = monotonicallyIncreasingTimeMS(); + bool timeToBeFair = false; + if (time > nextFairTime) + timeToBeFair = true; + + bool didDequeue = false; + + while (shouldContinue) { + ThreadData* current = *currentPtr; + if (verbose) + dataLog(toString(currentThread(), ": got thread ", RawPointer(current), "\n")); + if (!current) + break; + DequeueResult result = functor(current, timeToBeFair); + switch (result) { + case DequeueResult::Ignore: + if (verbose) + dataLog(toString(currentThread(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n")); + previous = current; + currentPtr = &(*currentPtr)->nextInQueue; + break; + case DequeueResult::RemoveAndStop: + shouldContinue = false; + FALLTHROUGH; + case DequeueResult::RemoveAndContinue: + if (verbose) + dataLog(toString(currentThread(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n")); + if (current == queueTail) + queueTail = previous; + didDequeue = true; + *currentPtr = current->nextInQueue; + current->nextInQueue = nullptr; + break; + } + } + + if (timeToBeFair && didDequeue) + nextFairTime = time + random.get(); + + ASSERT(!!queueHead == !!queueTail); + } + + ThreadData* dequeue() + { + ThreadData* result = nullptr; + genericDequeue( + [&] (ThreadData* element, bool) -> DequeueResult { + result = element; + return DequeueResult::RemoveAndStop; + }); + return result; + } + + ThreadData* queueHead { nullptr }; + ThreadData* queueTail { nullptr }; + + // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding + // this lock. + WordLock lock; + + double nextFairTime { 0 }; + + WeakRandom random; + + // Put some distane between buckets in memory. This is one of several mitigations against false + // sharing. + char padding[64]; +}; + +struct Hashtable; + +// We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors. +Vector<Hashtable*>* hashtables; +StaticWordLock hashtablesLock; + +struct Hashtable { + unsigned size; + Atomic<Bucket*> data[1]; + + static Hashtable* create(unsigned size) + { + ASSERT(size >= 1); + + Hashtable* result = static_cast<Hashtable*>( + fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1))); + result->size = size; + + { + // This is not fast and it's not data-access parallel, but that's fine, because + // hashtable resizing is guaranteed to be rare and it will never happen in steady + // state. + WordLockHolder locker(hashtablesLock); + if (!hashtables) + hashtables = new Vector<Hashtable*>(); + hashtables->append(result); + } + + return result; + } + + static void destroy(Hashtable* hashtable) + { + { + // This is not fast, but that's OK. See comment in create(). + WordLockHolder locker(hashtablesLock); + hashtables->removeFirst(hashtable); + } + + fastFree(hashtable); + } +}; + +Atomic<Hashtable*> hashtable; +Atomic<unsigned> numThreads; + +// With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the +// memory usage per thread will still be less than 1KB. +const unsigned maxLoadFactor = 3; + +const unsigned growthFactor = 2; + +unsigned hashAddress(const void* address) +{ + return WTF::PtrHash<const void*>::hash(address); +} + +Hashtable* ensureHashtable() +{ + for (;;) { + Hashtable* currentHashtable = hashtable.load(); + + if (currentHashtable) + return currentHashtable; + + if (!currentHashtable) { + currentHashtable = Hashtable::create(maxLoadFactor); + if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) { + if (verbose) + dataLog(toString(currentThread(), ": created initial hashtable ", RawPointer(currentHashtable), "\n")); + return currentHashtable; + } + + Hashtable::destroy(currentHashtable); + } + } +} + +// Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different +// after this returns than when you called it. Guarantees that there is a hashtable. This is pretty +// slow and not scalable, so it's only used during thread creation and for debugging/testing. +Vector<Bucket*> lockHashtable() +{ + for (;;) { + Hashtable* currentHashtable = ensureHashtable(); + + ASSERT(currentHashtable); + + // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that + // we can lock all of the buckets, not just the ones that are materialized. + Vector<Bucket*> buckets; + for (unsigned i = currentHashtable->size; i--;) { + Atomic<Bucket*>& bucketPointer = currentHashtable->data[i]; + + for (;;) { + Bucket* bucket = bucketPointer.load(); + + if (!bucket) { + bucket = new Bucket(); + if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) { + delete bucket; + continue; + } + } + + buckets.append(bucket); + break; + } + } + + // Now lock the buckets in the right order. + std::sort(buckets.begin(), buckets.end()); + for (Bucket* bucket : buckets) + bucket->lock.lock(); + + // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it + // now. + if (hashtable.load() == currentHashtable) + return buckets; + + // The hashtable rehashed. Unlock everything and try again. + for (Bucket* bucket : buckets) + bucket->lock.unlock(); + } +} + +void unlockHashtable(const Vector<Bucket*>& buckets) +{ + for (Bucket* bucket : buckets) + bucket->lock.unlock(); +} + +// Rehash the hashtable to handle numThreads threads. +void ensureHashtableSize(unsigned numThreads) +{ + // We try to ensure that the size of the hashtable used for thread queues is always large enough + // to avoid collisions. So, since we started a new thread, we may need to increase the size of the + // hashtable. This does just that. Note that we never free the old spine, since we never lock + // around spine accesses (i.e. the "hashtable" global variable). + + // First do a fast check to see if rehashing is needed. + Hashtable* oldHashtable = hashtable.load(); + if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) { + if (verbose) + dataLog(toString(currentThread(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n")); + return; + } + + // Seems like we *might* have to rehash, so lock the hashtable and try again. + Vector<Bucket*> bucketsToUnlock = lockHashtable(); + + // Check again, since the hashtable could have rehashed while we were locking it. Also, + // lockHashtable() creates an initial hashtable for us. + oldHashtable = hashtable.load(); + if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) { + if (verbose) + dataLog(toString(currentThread(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n")); + unlockHashtable(bucketsToUnlock); + return; + } + + Vector<Bucket*> reusableBuckets = bucketsToUnlock; + + // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas + // are placed into the vector in queue order. + Vector<ThreadData*> threadDatas; + for (Bucket* bucket : reusableBuckets) { + while (ThreadData* threadData = bucket->dequeue()) + threadDatas.append(threadData); + } + + unsigned newSize = numThreads * growthFactor * maxLoadFactor; + RELEASE_ASSERT(newSize > oldHashtable->size); + + Hashtable* newHashtable = Hashtable::create(newSize); + if (verbose) + dataLog(toString(currentThread(), ": created new hashtable: ", RawPointer(newHashtable), "\n")); + for (ThreadData* threadData : threadDatas) { + if (verbose) + dataLog(toString(currentThread(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n")); + unsigned hash = hashAddress(threadData->address); + unsigned index = hash % newHashtable->size; + if (verbose) + dataLog(toString(currentThread(), ": index = ", index, "\n")); + Bucket* bucket = newHashtable->data[index].load(); + if (!bucket) { + if (reusableBuckets.isEmpty()) + bucket = new Bucket(); + else + bucket = reusableBuckets.takeLast(); + newHashtable->data[index].store(bucket); + } + + bucket->enqueue(threadData); + } + + // At this point there may be some buckets left unreused. This could easily happen if the + // number of enqueued threads right now is low but the high watermark of the number of threads + // enqueued was high. We place these buckets into the hashtable basically at random, just to + // make sure we don't leak them. + for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) { + Atomic<Bucket*>& bucketPtr = newHashtable->data[i]; + if (bucketPtr.load()) + continue; + bucketPtr.store(reusableBuckets.takeLast()); + } + + // Since we increased the size of the hashtable, we should have exhausted our preallocated + // buckets by now. + ASSERT(reusableBuckets.isEmpty()); + + // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and + // roll. After we install the new hashtable, we can release all bucket locks. + + bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable; + RELEASE_ASSERT(result); + + unlockHashtable(bucketsToUnlock); +} + +ThreadData::ThreadData() + : threadIdentifier(currentThread()) +{ + unsigned currentNumThreads; + for (;;) { + unsigned oldNumThreads = numThreads.load(); + currentNumThreads = oldNumThreads + 1; + if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads)) + break; + } + + ensureHashtableSize(currentNumThreads); +} + +ThreadData::~ThreadData() +{ + for (;;) { + unsigned oldNumThreads = numThreads.load(); + if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1)) + break; + } +} + +ThreadData* myThreadData() +{ + static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData; + static std::once_flag initializeOnce; + std::call_once( + initializeOnce, + [] { + threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>(); + }); + + RefPtr<ThreadData>& result = **threadData; + + if (!result) + result = adoptRef(new ThreadData()); + + return result.get(); +} + +template<typename Functor> +bool enqueue(const void* address, const Functor& functor) +{ + unsigned hash = hashAddress(address); + + for (;;) { + Hashtable* myHashtable = ensureHashtable(); + unsigned index = hash % myHashtable->size; + Atomic<Bucket*>& bucketPointer = myHashtable->data[index]; + Bucket* bucket; + for (;;) { + bucket = bucketPointer.load(); + if (!bucket) { + bucket = new Bucket(); + if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) { + delete bucket; + continue; + } + } + break; + } + if (verbose) + dataLog(toString(currentThread(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n")); + bucket->lock.lock(); + + // At this point the hashtable could have rehashed under us. + if (hashtable.load() != myHashtable) { + bucket->lock.unlock(); + continue; + } + + ThreadData* threadData = functor(); + bool result; + if (threadData) { + if (verbose) + dataLog(toString(currentThread(), ": proceeding to enqueue ", RawPointer(threadData), "\n")); + bucket->enqueue(threadData); + result = true; + } else + result = false; + bucket->lock.unlock(); + return result; + } +} + +enum class BucketMode { + EnsureNonEmpty, + IgnoreEmpty +}; + +template<typename DequeueFunctor, typename FinishFunctor> +bool dequeue( + const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor, + const FinishFunctor& finishFunctor) +{ + unsigned hash = hashAddress(address); + + for (;;) { + Hashtable* myHashtable = ensureHashtable(); + unsigned index = hash % myHashtable->size; + Atomic<Bucket*>& bucketPointer = myHashtable->data[index]; + Bucket* bucket = bucketPointer.load(); + if (!bucket) { + if (bucketMode == BucketMode::IgnoreEmpty) + return false; + + for (;;) { + bucket = bucketPointer.load(); + if (!bucket) { + bucket = new Bucket(); + if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) { + delete bucket; + continue; + } + } + break; + } + } + + bucket->lock.lock(); + + // At this point the hashtable could have rehashed under us. + if (hashtable.load() != myHashtable) { + bucket->lock.unlock(); + continue; + } + + bucket->genericDequeue(dequeueFunctor); + bool result = !!bucket->queueHead; + finishFunctor(result); + bucket->lock.unlock(); + return result; + } +} + +} // anonymous namespace + +NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl( + const void* address, + const ScopedLambda<bool()>& validation, + const ScopedLambda<void()>& beforeSleep, + const TimeWithDynamicClockType& timeout) +{ + if (verbose) + dataLog(toString(currentThread(), ": parking.\n")); + + ThreadData* me = myThreadData(); + me->token = 0; + + // Guard against someone calling parkConditionally() recursively from beforeSleep(). + RELEASE_ASSERT(!me->address); + + bool enqueueResult = enqueue( + address, + [&] () -> ThreadData* { + if (!validation()) + return nullptr; + + me->address = address; + return me; + }); + + if (!enqueueResult) + return ParkResult(); + + beforeSleep(); + + bool didGetDequeued; + { + MutexLocker locker(me->parkingLock); + while (me->address && timeout.nowWithSameClock() < timeout) { + me->parkingCondition.timedWait( + me->parkingLock, timeout.approximateWallTime().secondsSinceEpoch().value()); + + // It's possible for the OS to decide not to wait. If it does that then it will also + // decide not to release the lock. If there's a bug in the time math, then this could + // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating + // spin. + me->parkingLock.unlock(); + me->parkingLock.lock(); + } + ASSERT(!me->address || me->address == address); + didGetDequeued = !me->address; + } + + if (didGetDequeued) { + // Great! We actually got dequeued rather than the timeout expiring. + ParkResult result; + result.wasUnparked = true; + result.token = me->token; + return result; + } + + // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet. + + bool didDequeue = false; + dequeue( + address, BucketMode::IgnoreEmpty, + [&] (ThreadData* element, bool) { + if (element == me) { + didDequeue = true; + return DequeueResult::RemoveAndStop; + } + return DequeueResult::Ignore; + }, + [] (bool) { }); + + // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked. + // If didDequeue is false, then someone unparked us. + + RELEASE_ASSERT(!me->nextInQueue); + + // Make sure that no matter what, me->address is null after this point. + { + MutexLocker locker(me->parkingLock); + if (!didDequeue) { + // If we did not dequeue ourselves, then someone else did. They will set our address to + // null. We don't want to proceed until they do this, because otherwise, they may set + // our address to null in some distant future when we're already trying to wait for + // other things. + while (me->address) + me->parkingCondition.wait(me->parkingLock); + } + me->address = nullptr; + } + + ParkResult result; + result.wasUnparked = !didDequeue; + if (!didDequeue) { + // If we were unparked then there should be a token. + result.token = me->token; + } + return result; +} + +NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address) +{ + if (verbose) + dataLog(toString(currentThread(), ": unparking one.\n")); + + UnparkResult result; + + RefPtr<ThreadData> threadData; + result.mayHaveMoreThreads = dequeue( + address, + // Why is this here? + // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty + // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform + // some operation while holding the bucket lock, which usually goes into the finish func. + // But if that operation is a no-op, then it's not clear why we need this. + BucketMode::EnsureNonEmpty, + [&] (ThreadData* element, bool) { + if (element->address != address) + return DequeueResult::Ignore; + threadData = element; + result.didUnparkThread = true; + return DequeueResult::RemoveAndStop; + }, + [] (bool) { }); + + if (!threadData) { + ASSERT(!result.didUnparkThread); + result.mayHaveMoreThreads = false; + return result; + } + + ASSERT(threadData->address); + + { + MutexLocker locker(threadData->parkingLock); + threadData->address = nullptr; + threadData->token = 0; + } + threadData->parkingCondition.signal(); + + return result; +} + +NEVER_INLINE void ParkingLot::unparkOneImpl( + const void* address, + const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback) +{ + if (verbose) + dataLog(toString(currentThread(), ": unparking one the hard way.\n")); + + RefPtr<ThreadData> threadData; + bool timeToBeFair = false; + dequeue( + address, + BucketMode::EnsureNonEmpty, + [&] (ThreadData* element, bool passedTimeToBeFair) { + if (element->address != address) + return DequeueResult::Ignore; + threadData = element; + timeToBeFair = passedTimeToBeFair; + return DequeueResult::RemoveAndStop; + }, + [&] (bool mayHaveMoreThreads) { + UnparkResult result; + result.didUnparkThread = !!threadData; + result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads; + if (timeToBeFair) + RELEASE_ASSERT(threadData); + result.timeToBeFair = timeToBeFair; + intptr_t token = callback(result); + if (threadData) + threadData->token = token; + }); + + if (!threadData) + return; + + ASSERT(threadData->address); + + { + MutexLocker locker(threadData->parkingLock); + threadData->address = nullptr; + } + // At this point, the threadData may die. Good thing we have a RefPtr<> on it. + threadData->parkingCondition.signal(); +} + +NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count) +{ + if (!count) + return 0; + + if (verbose) + dataLog(toString(currentThread(), ": unparking count = ", count, " from ", RawPointer(address), ".\n")); + + Vector<RefPtr<ThreadData>, 8> threadDatas; + dequeue( + address, + // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does, + // but that seems wrong. + BucketMode::IgnoreEmpty, + [&] (ThreadData* element, bool) { + if (verbose) + dataLog(toString(currentThread(), ": Observing element with address = ", RawPointer(element->address), "\n")); + if (element->address != address) + return DequeueResult::Ignore; + threadDatas.append(element); + if (threadDatas.size() == count) + return DequeueResult::RemoveAndStop; + return DequeueResult::RemoveAndContinue; + }, + [] (bool) { }); + + for (RefPtr<ThreadData>& threadData : threadDatas) { + if (verbose) + dataLog(toString(currentThread(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n")); + ASSERT(threadData->address); + { + MutexLocker locker(threadData->parkingLock); + threadData->address = nullptr; + } + threadData->parkingCondition.signal(); + } + + if (verbose) + dataLog(toString(currentThread(), ": done unparking.\n")); + + return threadDatas.size(); +} + +NEVER_INLINE void ParkingLot::unparkAll(const void* address) +{ + unparkCount(address, UINT_MAX); +} + +NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(ThreadIdentifier, const void*)>& callback) +{ + Vector<Bucket*> bucketsToUnlock = lockHashtable(); + + Hashtable* currentHashtable = hashtable.load(); + for (unsigned i = currentHashtable->size; i--;) { + Bucket* bucket = currentHashtable->data[i].load(); + if (!bucket) + continue; + for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue) + callback(currentThreadData->threadIdentifier, currentThreadData->address); + } + + unlockHashtable(bucketsToUnlock); +} + +} // namespace WTF + |