diff options
author | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
---|---|---|
committer | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
commit | 1bf1084f2b10c3b47fd1a588d85d21ed0eb41d0c (patch) | |
tree | 46dcd36c86e7fbc6e5df36deb463b33e9967a6f7 /Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp | |
parent | 32761a6cee1d0dee366b885b7b9c777e67885688 (diff) | |
download | WebKitGtk-tarball-master.tar.gz |
webkitgtk-2.16.5HEADwebkitgtk-2.16.5master
Diffstat (limited to 'Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp')
-rw-r--r-- | Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp | 474 |
1 files changed, 203 insertions, 271 deletions
diff --git a/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp b/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp index 65b8d2bc1..f0d86b615 100644 --- a/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp +++ b/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp @@ -35,9 +35,9 @@ #include "WorkerThreadableWebSocketChannel.h" #include "Blob.h" -#include "CrossThreadTask.h" #include "Document.h" #include "ScriptExecutionContext.h" +#include "SocketProvider.h" #include "ThreadableWebSocketChannelClientWrapper.h" #include "WebSocketChannel.h" #include "WebSocketChannelClient.h" @@ -47,15 +47,15 @@ #include "WorkerThread.h" #include <runtime/ArrayBuffer.h> #include <wtf/MainThread.h> -#include <wtf/PassRefPtr.h> #include <wtf/text/WTFString.h> namespace WebCore { -WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& taskMode) +WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& context, WebSocketChannelClient& client, const String& taskMode, SocketProvider& provider) : m_workerGlobalScope(context) , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client)) - , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope, taskMode)) + , m_bridge(Bridge::create(m_workerClientWrapper.copyRef(), m_workerGlobalScope.copyRef(), taskMode, provider)) + , m_socketProvider(provider) { m_bridge->initialize(); } @@ -74,13 +74,11 @@ void WorkerThreadableWebSocketChannel::connect(const URL& url, const String& pro String WorkerThreadableWebSocketChannel::subprotocol() { - ASSERT(m_workerClientWrapper); return m_workerClientWrapper->subprotocol(); } String WorkerThreadableWebSocketChannel::extensions() { - ASSERT(m_workerClientWrapper); return m_workerClientWrapper->extensions(); } @@ -98,14 +96,14 @@ ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(co return m_bridge->send(binaryData, byteOffset, byteLength); } -ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const Blob& binaryData) +ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(Blob& binaryData) { if (!m_bridge) return ThreadableWebSocketChannel::SendFail; return m_bridge->send(binaryData); } -unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const +unsigned WorkerThreadableWebSocketChannel::bufferedAmount() const { if (!m_bridge) return 0; @@ -127,7 +125,7 @@ void WorkerThreadableWebSocketChannel::fail(const String& reason) void WorkerThreadableWebSocketChannel::disconnect() { m_bridge->disconnect(); - m_bridge.clear(); + m_bridge = nullptr; } void WorkerThreadableWebSocketChannel::suspend() @@ -144,10 +142,10 @@ void WorkerThreadableWebSocketChannel::resume() m_bridge->resume(); } -WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode) - : m_workerClientWrapper(clientWrapper) +WorkerThreadableWebSocketChannel::Peer::Peer(Ref<ThreadableWebSocketChannelClientWrapper>&& clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext& context, const String& taskMode, SocketProvider& provider) + : m_workerClientWrapper(WTFMove(clientWrapper)) , m_loaderProxy(loaderProxy) - , m_mainWebSocketChannel(WebSocketChannel::create(toDocument(context), this)) + , m_mainWebSocketChannel(WebSocketChannel::create(downcast<Document>(context), *this, provider)) , m_taskMode(taskMode) { ASSERT(isMainThread()); @@ -168,52 +166,53 @@ void WorkerThreadableWebSocketChannel::Peer::connect(const URL& url, const Strin m_mainWebSocketChannel->connect(url, protocol); } -static void workerGlobalScopeDidSend(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, ThreadableWebSocketChannel::SendResult sendRequestResult) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->setSendRequestResult(sendRequestResult); -} - void WorkerThreadableWebSocketChannel::Peer::send(const String& message) { ASSERT(isMainThread()); - if (!m_mainWebSocketChannel || !m_workerClientWrapper) + if (!m_mainWebSocketChannel) return; + ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable { + workerClientWrapper->setSendRequestResult(sendRequestResult); + }, m_taskMode); } void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData) { ASSERT(isMainThread()); - if (!m_mainWebSocketChannel || !m_workerClientWrapper) + if (!m_mainWebSocketChannel) return; + ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable { + workerClientWrapper->setSendRequestResult(sendRequestResult); + }, m_taskMode); } -void WorkerThreadableWebSocketChannel::Peer::send(const Blob& binaryData) +void WorkerThreadableWebSocketChannel::Peer::send(Blob& binaryData) { ASSERT(isMainThread()); - if (!m_mainWebSocketChannel || !m_workerClientWrapper) + if (!m_mainWebSocketChannel) return; - ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode); -} -static void workerGlobalScopeDidGetBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->setBufferedAmount(bufferedAmount); + ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable { + workerClientWrapper->setSendRequestResult(sendRequestResult); + }, m_taskMode); } void WorkerThreadableWebSocketChannel::Peer::bufferedAmount() { ASSERT(isMainThread()); - if (!m_mainWebSocketChannel || !m_workerClientWrapper) + if (!m_mainWebSocketChannel) return; - unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); + + unsigned bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), bufferedAmount](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->setBufferedAmount(bufferedAmount); + }, m_taskMode); } void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason) @@ -238,7 +237,7 @@ void WorkerThreadableWebSocketChannel::Peer::disconnect() if (!m_mainWebSocketChannel) return; m_mainWebSocketChannel->disconnect(); - m_mainWebSocketChannel = 0; + m_mainWebSocketChannel = nullptr; } void WorkerThreadableWebSocketChannel::Peer::suspend() @@ -257,101 +256,98 @@ void WorkerThreadableWebSocketChannel::Peer::resume() m_mainWebSocketChannel->resume(); } -static void workerGlobalScopeDidConnect(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->setSubprotocol(subprotocol); - workerClientWrapper->setExtensions(extensions); - workerClientWrapper->didConnect(); -} - void WorkerThreadableWebSocketChannel::Peer::didConnect() { ASSERT(isMainThread()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()), m_taskMode); -} -static void workerGlobalScopeDidReceiveMessage(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->didReceiveMessage(message); + String subprotocol = m_mainWebSocketChannel->subprotocol(); + String extensions = m_mainWebSocketChannel->extensions(); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), subprotocol = subprotocol.isolatedCopy(), extensions = extensions.isolatedCopy()](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->setSubprotocol(subprotocol); + workerClientWrapper->setExtensions(extensions); + workerClientWrapper->didConnect(); + }, m_taskMode); } void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message) { ASSERT(isMainThread()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message), m_taskMode); -} -static void workerGlobalScopeDidReceiveBinaryData(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char>> binaryData) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->didReceiveBinaryData(binaryData); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), message = message.isolatedCopy()](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didReceiveMessage(message); + }, m_taskMode); } -void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char>> binaryData) +void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(Vector<uint8_t>&& binaryData) { ASSERT(isMainThread()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode); -} -static void workerGlobalScopeDidUpdateBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->didUpdateBufferedAmount(bufferedAmount); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), binaryData = WTFMove(binaryData)](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didReceiveBinaryData(WTFMove(binaryData)); + }, m_taskMode); } -void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount) +void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned bufferedAmount) { ASSERT(isMainThread()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); -} -static void workerGlobalScopeDidStartClosingHandshake(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->didStartClosingHandshake(); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), bufferedAmount](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didUpdateBufferedAmount(bufferedAmount); + }, m_taskMode); } void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake() { ASSERT(isMainThread()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper), m_taskMode); -} -static void workerGlobalScopeDidClose(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) -{ - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason); + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didStartClosingHandshake(); + }, m_taskMode); } -void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) +void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) { ASSERT(isMainThread()); - m_mainWebSocketChannel = 0; - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode); + m_mainWebSocketChannel = nullptr; + + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), unhandledBufferedAmount, closingHandshakeCompletion, code, reason = reason.isolatedCopy()](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason); + }, m_taskMode); } -static void workerGlobalScopeDidReceiveMessageError(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) +void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() { - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - workerClientWrapper->didReceiveMessageError(); + ASSERT(isMainThread()); + + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didReceiveMessageError(); + }, m_taskMode); } -void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() +void WorkerThreadableWebSocketChannel::Peer::didUpgradeURL() { ASSERT(isMainThread()); - m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper), m_taskMode); + + m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + workerClientWrapper->didUpgradeURL(); + }, m_taskMode); } -WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerGlobalScope> workerGlobalScope, const String& taskMode) - : m_workerClientWrapper(workerClientWrapper) - , m_workerGlobalScope(workerGlobalScope) - , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy()) +WorkerThreadableWebSocketChannel::Bridge::Bridge(Ref<ThreadableWebSocketChannelClientWrapper>&& workerClientWrapper, Ref<WorkerGlobalScope>&& workerGlobalScope, const String& taskMode, Ref<SocketProvider>&& socketProvider) + : m_workerClientWrapper(WTFMove(workerClientWrapper)) + , m_workerGlobalScope(WTFMove(workerGlobalScope)) + , m_loaderProxy(m_workerGlobalScope->thread().workerLoaderProxy()) , m_taskMode(taskMode) - , m_peer(0) + , m_socketProvider(WTFMove(socketProvider)) { - ASSERT(m_workerClientWrapper.get()); } WorkerThreadableWebSocketChannel::Bridge::~Bridge() @@ -359,273 +355,210 @@ WorkerThreadableWebSocketChannel::Bridge::~Bridge() disconnect(); } -class WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask : public ScriptExecutionContext::Task { -public: - static PassOwnPtr<ScriptExecutionContext::Task> create(WorkerThreadableWebSocketChannel::Peer* peer, - WorkerLoaderProxy* loaderProxy, - PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) - { - return adoptPtr(new WorkerGlobalScopeDidInitializeTask(peer, loaderProxy, workerClientWrapper)); - } - - virtual ~WorkerGlobalScopeDidInitializeTask() { } - virtual void performTask(ScriptExecutionContext* context) override - { - ASSERT_UNUSED(context, context->isWorkerGlobalScope()); - if (m_workerClientWrapper->failedWebSocketChannelCreation()) { - // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer. - OwnPtr<WorkerThreadableWebSocketChannel::Peer> peer = adoptPtr(m_peer); - m_peer = 0; - m_loaderProxy->postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release())); - } else - m_workerClientWrapper->didCreateWebSocketChannel(m_peer); - } - virtual bool isCleanupTask() const override { return true; } - -private: - WorkerGlobalScopeDidInitializeTask(WorkerThreadableWebSocketChannel::Peer* peer, - WorkerLoaderProxy* loaderProxy, - PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) - : m_peer(peer) - , m_loaderProxy(loaderProxy) - , m_workerClientWrapper(workerClientWrapper) - { - } - - WorkerThreadableWebSocketChannel::Peer* m_peer; - WorkerLoaderProxy* m_loaderProxy; - RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper; -}; - -void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext* context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode) +void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext& context, WorkerLoaderProxy& loaderProxy, Ref<ThreadableWebSocketChannelClientWrapper>&& clientWrapper, const String& taskMode, Ref<SocketProvider>&& provider) { ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - - RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper; - - Peer* peer = Peer::create(clientWrapper, *loaderProxy, context, taskMode); - bool sent = loaderProxy->postTaskForModeToWorkerGlobalScope( - WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask::create(peer, loaderProxy, clientWrapper), taskMode); - if (!sent) { + ASSERT(context.isDocument()); + + bool sent = loaderProxy.postTaskForModeToWorkerGlobalScope({ + ScriptExecutionContext::Task::CleanupTask, + [clientWrapper = clientWrapper.copyRef(), &loaderProxy, peer = std::make_unique<Peer>(clientWrapper.copyRef(), loaderProxy, context, taskMode, WTFMove(provider))](ScriptExecutionContext& context) mutable { + ASSERT_UNUSED(context, context.isWorkerGlobalScope()); + if (clientWrapper->failedWebSocketChannelCreation()) { + // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer. + loaderProxy.postTaskToLoader([peer = WTFMove(peer)](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + }); + } else + clientWrapper->didCreateWebSocketChannel(peer.release()); + } + }, taskMode); + + if (!sent) clientWrapper->clearPeer(); - delete peer; - } } void WorkerThreadableWebSocketChannel::Bridge::initialize() { ASSERT(!m_peer); setMethodNotCompleted(); - Ref<Bridge> protect(*this); - m_loaderProxy.postTaskToLoader( - createCallbackTask(&Bridge::mainThreadInitialize, - AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, m_taskMode)); + Ref<Bridge> protectedThis(*this); + + m_loaderProxy.postTaskToLoader([&loaderProxy = m_loaderProxy, workerClientWrapper = m_workerClientWrapper.copyRef(), taskMode = m_taskMode.isolatedCopy(), provider = m_socketProvider.copyRef()](ScriptExecutionContext& context) mutable { + mainThreadInitialize(context, loaderProxy, WTFMove(workerClientWrapper), taskMode, WTFMove(provider)); + }); waitForMethodCompletion(); + // m_peer may be null when the nested runloop exited before a peer has created. m_peer = m_workerClientWrapper->peer(); if (!m_peer) m_workerClientWrapper->setFailedWebSocketChannelCreation(); } -void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const URL& url, const String& protocol) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); - - peer->connect(url, protocol); -} - void WorkerThreadableWebSocketChannel::Bridge::connect(const URL& url, const String& protocol) { - ASSERT(m_workerClientWrapper); if (!m_peer) return; - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol)); -} -void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); + m_loaderProxy.postTaskToLoader([peer = m_peer, url = url.isolatedCopy(), protocol = protocol.isolatedCopy()](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); - peer->send(message); -} - -void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ScriptExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char>> data) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); - - RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size()); - peer->send(*arrayBuffer); -} - -void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ScriptExecutionContext* context, Peer* peer, const URL& url, const String& type, long long size) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); - - RefPtr<Blob> blob = Blob::create(url, type, size); - peer->send(*blob); + peer->connect(url, protocol); + }); } ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message) { - if (!m_workerClientWrapper || !m_peer) + if (!m_peer) return ThreadableWebSocketChannel::SendFail; setMethodNotCompleted(); - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message)); - Ref<Bridge> protect(*this); + + m_loaderProxy.postTaskToLoader([peer = m_peer, message = message.isolatedCopy()](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); + + peer->send(message); + }); + + Ref<Bridge> protectedThis(*this); waitForMethodCompletion(); - ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); - if (!clientWrapper) - return ThreadableWebSocketChannel::SendFail; - return clientWrapper->sendRequestResult(); + return m_workerClientWrapper->sendRequestResult(); } ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) { - if (!m_workerClientWrapper || !m_peer) + if (!m_peer) return ThreadableWebSocketChannel::SendFail; + // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>. - OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>(byteLength)); + Vector<char> data(byteLength); if (binaryData.byteLength()) - memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength); + memcpy(data.data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength); setMethodNotCompleted(); - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release())); - Ref<Bridge> protect(*this); + + m_loaderProxy.postTaskToLoader([peer = m_peer, data = WTFMove(data)](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); + + auto arrayBuffer = ArrayBuffer::create(data.data(), data.size()); + peer->send(arrayBuffer); + }); + + Ref<Bridge> protectedThis(*this); waitForMethodCompletion(); - ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); - if (!clientWrapper) - return ThreadableWebSocketChannel::SendFail; - return clientWrapper->sendRequestResult(); + return m_workerClientWrapper->sendRequestResult(); } -ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const Blob& binaryData) +ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Blob& binaryData) { - if (!m_workerClientWrapper || !m_peer) + if (!m_peer) return ThreadableWebSocketChannel::SendFail; setMethodNotCompleted(); - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), binaryData.url(), binaryData.type(), binaryData.size())); - Ref<Bridge> protect(*this); - waitForMethodCompletion(); - ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); - if (!clientWrapper) - return ThreadableWebSocketChannel::SendFail; - return clientWrapper->sendRequestResult(); -} -void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); + m_loaderProxy.postTaskToLoader([peer = m_peer, url = binaryData.url().isolatedCopy(), type = binaryData.type().isolatedCopy(), size = binaryData.size()](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); - peer->bufferedAmount(); + peer->send(Blob::deserialize(url, type, size, { })); + }); + + Ref<Bridge> protectedThis(*this); + waitForMethodCompletion(); + return m_workerClientWrapper->sendRequestResult(); } -unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() +unsigned WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() { - if (!m_workerClientWrapper || !m_peer) + if (!m_peer) return 0; setMethodNotCompleted(); - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer))); - Ref<Bridge> protect(*this); - waitForMethodCompletion(); - ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); - if (clientWrapper) - return clientWrapper->bufferedAmount(); - return 0; -} -void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer, int code, const String& reason) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); + m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); - peer->close(code, reason); + peer->bufferedAmount(); + }); + + Ref<Bridge> protectedThis(*this); + waitForMethodCompletion(); + return m_workerClientWrapper->bufferedAmount(); } void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason) { if (!m_peer) return; - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason)); -} -void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); + m_loaderProxy.postTaskToLoader([peer = m_peer, code, reason = reason.isolatedCopy()](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); - peer->fail(reason); + peer->close(code, reason); + }); } void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason) { if (!m_peer) return; - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason)); -} -void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, PassOwnPtr<Peer> peer) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT_UNUSED(peer, peer); + m_loaderProxy.postTaskToLoader([peer = m_peer, reason = reason.isolatedCopy()](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); - // Peer object will be deleted even if the task does not run in the main thread's cleanup period, because - // the destructor for the task object (created by createCallbackTask()) will automatically delete the peer. + peer->fail(reason); + }); } void WorkerThreadableWebSocketChannel::Bridge::disconnect() { clearClientWrapper(); if (m_peer) { - OwnPtr<Peer> peer = adoptPtr(m_peer); - m_peer = 0; - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release())); + m_loaderProxy.postTaskToLoader([peer = std::unique_ptr<Peer>(m_peer)](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + }); + m_peer = nullptr; } - m_workerGlobalScope = 0; -} - -void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); - - peer->suspend(); + m_workerGlobalScope = nullptr; } void WorkerThreadableWebSocketChannel::Bridge::suspend() { if (!m_peer) return; - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer))); -} -void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer) -{ - ASSERT(isMainThread()); - ASSERT_UNUSED(context, context->isDocument()); - ASSERT(peer); + m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); - peer->resume(); + peer->suspend(); + }); } void WorkerThreadableWebSocketChannel::Bridge::resume() { if (!m_peer) return; - m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer))); + + m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) { + ASSERT(isMainThread()); + ASSERT_UNUSED(context, context.isDocument()); + ASSERT(peer); + + peer->resume(); + }); } void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() @@ -635,7 +568,6 @@ void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted() { - ASSERT(m_workerClientWrapper); m_workerClientWrapper->clearSyncMethodDone(); } @@ -645,12 +577,12 @@ void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() { if (!m_workerGlobalScope) return; - WorkerRunLoop& runLoop = m_workerGlobalScope->thread()->runLoop(); + WorkerRunLoop& runLoop = m_workerGlobalScope->thread().runLoop(); MessageQueueWaitResult result = MessageQueueMessageReceived; - ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); + ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.ptr(); while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) { result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null. - clientWrapper = m_workerClientWrapper.get(); + clientWrapper = m_workerClientWrapper.ptr(); } } |