summaryrefslogtreecommitdiff
path: root/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp')
-rw-r--r--Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp474
1 files changed, 203 insertions, 271 deletions
diff --git a/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp b/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp
index 65b8d2bc1..f0d86b615 100644
--- a/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp
+++ b/Source/WebCore/Modules/websockets/WorkerThreadableWebSocketChannel.cpp
@@ -35,9 +35,9 @@
#include "WorkerThreadableWebSocketChannel.h"
#include "Blob.h"
-#include "CrossThreadTask.h"
#include "Document.h"
#include "ScriptExecutionContext.h"
+#include "SocketProvider.h"
#include "ThreadableWebSocketChannelClientWrapper.h"
#include "WebSocketChannel.h"
#include "WebSocketChannelClient.h"
@@ -47,15 +47,15 @@
#include "WorkerThread.h"
#include <runtime/ArrayBuffer.h>
#include <wtf/MainThread.h>
-#include <wtf/PassRefPtr.h>
#include <wtf/text/WTFString.h>
namespace WebCore {
-WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& taskMode)
+WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& context, WebSocketChannelClient& client, const String& taskMode, SocketProvider& provider)
: m_workerGlobalScope(context)
, m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client))
- , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope, taskMode))
+ , m_bridge(Bridge::create(m_workerClientWrapper.copyRef(), m_workerGlobalScope.copyRef(), taskMode, provider))
+ , m_socketProvider(provider)
{
m_bridge->initialize();
}
@@ -74,13 +74,11 @@ void WorkerThreadableWebSocketChannel::connect(const URL& url, const String& pro
String WorkerThreadableWebSocketChannel::subprotocol()
{
- ASSERT(m_workerClientWrapper);
return m_workerClientWrapper->subprotocol();
}
String WorkerThreadableWebSocketChannel::extensions()
{
- ASSERT(m_workerClientWrapper);
return m_workerClientWrapper->extensions();
}
@@ -98,14 +96,14 @@ ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(co
return m_bridge->send(binaryData, byteOffset, byteLength);
}
-ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const Blob& binaryData)
+ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(Blob& binaryData)
{
if (!m_bridge)
return ThreadableWebSocketChannel::SendFail;
return m_bridge->send(binaryData);
}
-unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
+unsigned WorkerThreadableWebSocketChannel::bufferedAmount() const
{
if (!m_bridge)
return 0;
@@ -127,7 +125,7 @@ void WorkerThreadableWebSocketChannel::fail(const String& reason)
void WorkerThreadableWebSocketChannel::disconnect()
{
m_bridge->disconnect();
- m_bridge.clear();
+ m_bridge = nullptr;
}
void WorkerThreadableWebSocketChannel::suspend()
@@ -144,10 +142,10 @@ void WorkerThreadableWebSocketChannel::resume()
m_bridge->resume();
}
-WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode)
- : m_workerClientWrapper(clientWrapper)
+WorkerThreadableWebSocketChannel::Peer::Peer(Ref<ThreadableWebSocketChannelClientWrapper>&& clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext& context, const String& taskMode, SocketProvider& provider)
+ : m_workerClientWrapper(WTFMove(clientWrapper))
, m_loaderProxy(loaderProxy)
- , m_mainWebSocketChannel(WebSocketChannel::create(toDocument(context), this))
+ , m_mainWebSocketChannel(WebSocketChannel::create(downcast<Document>(context), *this, provider))
, m_taskMode(taskMode)
{
ASSERT(isMainThread());
@@ -168,52 +166,53 @@ void WorkerThreadableWebSocketChannel::Peer::connect(const URL& url, const Strin
m_mainWebSocketChannel->connect(url, protocol);
}
-static void workerGlobalScopeDidSend(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, ThreadableWebSocketChannel::SendResult sendRequestResult)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setSendRequestResult(sendRequestResult);
-}
-
void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
+ if (!m_mainWebSocketChannel)
return;
+
ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable {
+ workerClientWrapper->setSendRequestResult(sendRequestResult);
+ }, m_taskMode);
}
void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
+ if (!m_mainWebSocketChannel)
return;
+
ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable {
+ workerClientWrapper->setSendRequestResult(sendRequestResult);
+ }, m_taskMode);
}
-void WorkerThreadableWebSocketChannel::Peer::send(const Blob& binaryData)
+void WorkerThreadableWebSocketChannel::Peer::send(Blob& binaryData)
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
+ if (!m_mainWebSocketChannel)
return;
- ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData);
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
-}
-static void workerGlobalScopeDidGetBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setBufferedAmount(bufferedAmount);
+ ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData);
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable {
+ workerClientWrapper->setSendRequestResult(sendRequestResult);
+ }, m_taskMode);
}
void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
{
ASSERT(isMainThread());
- if (!m_mainWebSocketChannel || !m_workerClientWrapper)
+ if (!m_mainWebSocketChannel)
return;
- unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
+
+ unsigned bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), bufferedAmount](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->setBufferedAmount(bufferedAmount);
+ }, m_taskMode);
}
void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
@@ -238,7 +237,7 @@ void WorkerThreadableWebSocketChannel::Peer::disconnect()
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->disconnect();
- m_mainWebSocketChannel = 0;
+ m_mainWebSocketChannel = nullptr;
}
void WorkerThreadableWebSocketChannel::Peer::suspend()
@@ -257,101 +256,98 @@ void WorkerThreadableWebSocketChannel::Peer::resume()
m_mainWebSocketChannel->resume();
}
-static void workerGlobalScopeDidConnect(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->setSubprotocol(subprotocol);
- workerClientWrapper->setExtensions(extensions);
- workerClientWrapper->didConnect();
-}
-
void WorkerThreadableWebSocketChannel::Peer::didConnect()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()), m_taskMode);
-}
-static void workerGlobalScopeDidReceiveMessage(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didReceiveMessage(message);
+ String subprotocol = m_mainWebSocketChannel->subprotocol();
+ String extensions = m_mainWebSocketChannel->extensions();
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), subprotocol = subprotocol.isolatedCopy(), extensions = extensions.isolatedCopy()](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->setSubprotocol(subprotocol);
+ workerClientWrapper->setExtensions(extensions);
+ workerClientWrapper->didConnect();
+ }, m_taskMode);
}
void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
-}
-static void workerGlobalScopeDidReceiveBinaryData(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char>> binaryData)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didReceiveBinaryData(binaryData);
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), message = message.isolatedCopy()](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didReceiveMessage(message);
+ }, m_taskMode);
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char>> binaryData)
+void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(Vector<uint8_t>&& binaryData)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode);
-}
-static void workerGlobalScopeDidUpdateBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), binaryData = WTFMove(binaryData)](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didReceiveBinaryData(WTFMove(binaryData));
+ }, m_taskMode);
}
-void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
+void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned bufferedAmount)
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
-}
-static void workerGlobalScopeDidStartClosingHandshake(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didStartClosingHandshake();
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), bufferedAmount](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
+ }, m_taskMode);
}
void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
-}
-static void workerGlobalScopeDidClose(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
-{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didStartClosingHandshake();
+ }, m_taskMode);
}
-void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
+void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT(isMainThread());
- m_mainWebSocketChannel = 0;
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode);
+ m_mainWebSocketChannel = nullptr;
+
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), unhandledBufferedAmount, closingHandshakeCompletion, code, reason = reason.isolatedCopy()](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
+ }, m_taskMode);
}
-static void workerGlobalScopeDidReceiveMessageError(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
+void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
{
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- workerClientWrapper->didReceiveMessageError();
+ ASSERT(isMainThread());
+
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didReceiveMessageError();
+ }, m_taskMode);
}
-void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
+void WorkerThreadableWebSocketChannel::Peer::didUpgradeURL()
{
ASSERT(isMainThread());
- m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper), m_taskMode);
+
+ m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ workerClientWrapper->didUpgradeURL();
+ }, m_taskMode);
}
-WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerGlobalScope> workerGlobalScope, const String& taskMode)
- : m_workerClientWrapper(workerClientWrapper)
- , m_workerGlobalScope(workerGlobalScope)
- , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
+WorkerThreadableWebSocketChannel::Bridge::Bridge(Ref<ThreadableWebSocketChannelClientWrapper>&& workerClientWrapper, Ref<WorkerGlobalScope>&& workerGlobalScope, const String& taskMode, Ref<SocketProvider>&& socketProvider)
+ : m_workerClientWrapper(WTFMove(workerClientWrapper))
+ , m_workerGlobalScope(WTFMove(workerGlobalScope))
+ , m_loaderProxy(m_workerGlobalScope->thread().workerLoaderProxy())
, m_taskMode(taskMode)
- , m_peer(0)
+ , m_socketProvider(WTFMove(socketProvider))
{
- ASSERT(m_workerClientWrapper.get());
}
WorkerThreadableWebSocketChannel::Bridge::~Bridge()
@@ -359,273 +355,210 @@ WorkerThreadableWebSocketChannel::Bridge::~Bridge()
disconnect();
}
-class WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask : public ScriptExecutionContext::Task {
-public:
- static PassOwnPtr<ScriptExecutionContext::Task> create(WorkerThreadableWebSocketChannel::Peer* peer,
- WorkerLoaderProxy* loaderProxy,
- PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
- {
- return adoptPtr(new WorkerGlobalScopeDidInitializeTask(peer, loaderProxy, workerClientWrapper));
- }
-
- virtual ~WorkerGlobalScopeDidInitializeTask() { }
- virtual void performTask(ScriptExecutionContext* context) override
- {
- ASSERT_UNUSED(context, context->isWorkerGlobalScope());
- if (m_workerClientWrapper->failedWebSocketChannelCreation()) {
- // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer.
- OwnPtr<WorkerThreadableWebSocketChannel::Peer> peer = adoptPtr(m_peer);
- m_peer = 0;
- m_loaderProxy->postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
- } else
- m_workerClientWrapper->didCreateWebSocketChannel(m_peer);
- }
- virtual bool isCleanupTask() const override { return true; }
-
-private:
- WorkerGlobalScopeDidInitializeTask(WorkerThreadableWebSocketChannel::Peer* peer,
- WorkerLoaderProxy* loaderProxy,
- PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
- : m_peer(peer)
- , m_loaderProxy(loaderProxy)
- , m_workerClientWrapper(workerClientWrapper)
- {
- }
-
- WorkerThreadableWebSocketChannel::Peer* m_peer;
- WorkerLoaderProxy* m_loaderProxy;
- RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper;
-};
-
-void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext* context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode)
+void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext& context, WorkerLoaderProxy& loaderProxy, Ref<ThreadableWebSocketChannelClientWrapper>&& clientWrapper, const String& taskMode, Ref<SocketProvider>&& provider)
{
ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
-
- RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
-
- Peer* peer = Peer::create(clientWrapper, *loaderProxy, context, taskMode);
- bool sent = loaderProxy->postTaskForModeToWorkerGlobalScope(
- WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask::create(peer, loaderProxy, clientWrapper), taskMode);
- if (!sent) {
+ ASSERT(context.isDocument());
+
+ bool sent = loaderProxy.postTaskForModeToWorkerGlobalScope({
+ ScriptExecutionContext::Task::CleanupTask,
+ [clientWrapper = clientWrapper.copyRef(), &loaderProxy, peer = std::make_unique<Peer>(clientWrapper.copyRef(), loaderProxy, context, taskMode, WTFMove(provider))](ScriptExecutionContext& context) mutable {
+ ASSERT_UNUSED(context, context.isWorkerGlobalScope());
+ if (clientWrapper->failedWebSocketChannelCreation()) {
+ // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer.
+ loaderProxy.postTaskToLoader([peer = WTFMove(peer)](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ });
+ } else
+ clientWrapper->didCreateWebSocketChannel(peer.release());
+ }
+ }, taskMode);
+
+ if (!sent)
clientWrapper->clearPeer();
- delete peer;
- }
}
void WorkerThreadableWebSocketChannel::Bridge::initialize()
{
ASSERT(!m_peer);
setMethodNotCompleted();
- Ref<Bridge> protect(*this);
- m_loaderProxy.postTaskToLoader(
- createCallbackTask(&Bridge::mainThreadInitialize,
- AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, m_taskMode));
+ Ref<Bridge> protectedThis(*this);
+
+ m_loaderProxy.postTaskToLoader([&loaderProxy = m_loaderProxy, workerClientWrapper = m_workerClientWrapper.copyRef(), taskMode = m_taskMode.isolatedCopy(), provider = m_socketProvider.copyRef()](ScriptExecutionContext& context) mutable {
+ mainThreadInitialize(context, loaderProxy, WTFMove(workerClientWrapper), taskMode, WTFMove(provider));
+ });
waitForMethodCompletion();
+
// m_peer may be null when the nested runloop exited before a peer has created.
m_peer = m_workerClientWrapper->peer();
if (!m_peer)
m_workerClientWrapper->setFailedWebSocketChannelCreation();
}
-void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const URL& url, const String& protocol)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->connect(url, protocol);
-}
-
void WorkerThreadableWebSocketChannel::Bridge::connect(const URL& url, const String& protocol)
{
- ASSERT(m_workerClientWrapper);
if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol));
-}
-void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ m_loaderProxy.postTaskToLoader([peer = m_peer, url = url.isolatedCopy(), protocol = protocol.isolatedCopy()](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
- peer->send(message);
-}
-
-void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ScriptExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char>> data)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size());
- peer->send(*arrayBuffer);
-}
-
-void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ScriptExecutionContext* context, Peer* peer, const URL& url, const String& type, long long size)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- RefPtr<Blob> blob = Blob::create(url, type, size);
- peer->send(*blob);
+ peer->connect(url, protocol);
+ });
}
ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
{
- if (!m_workerClientWrapper || !m_peer)
+ if (!m_peer)
return ThreadableWebSocketChannel::SendFail;
setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
- Ref<Bridge> protect(*this);
+
+ m_loaderProxy.postTaskToLoader([peer = m_peer, message = message.isolatedCopy()](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
+
+ peer->send(message);
+ });
+
+ Ref<Bridge> protectedThis(*this);
waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (!clientWrapper)
- return ThreadableWebSocketChannel::SendFail;
- return clientWrapper->sendRequestResult();
+ return m_workerClientWrapper->sendRequestResult();
}
ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
- if (!m_workerClientWrapper || !m_peer)
+ if (!m_peer)
return ThreadableWebSocketChannel::SendFail;
+
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
- OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>(byteLength));
+ Vector<char> data(byteLength);
if (binaryData.byteLength())
- memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
+ memcpy(data.data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release()));
- Ref<Bridge> protect(*this);
+
+ m_loaderProxy.postTaskToLoader([peer = m_peer, data = WTFMove(data)](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
+
+ auto arrayBuffer = ArrayBuffer::create(data.data(), data.size());
+ peer->send(arrayBuffer);
+ });
+
+ Ref<Bridge> protectedThis(*this);
waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (!clientWrapper)
- return ThreadableWebSocketChannel::SendFail;
- return clientWrapper->sendRequestResult();
+ return m_workerClientWrapper->sendRequestResult();
}
-ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const Blob& binaryData)
+ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Blob& binaryData)
{
- if (!m_workerClientWrapper || !m_peer)
+ if (!m_peer)
return ThreadableWebSocketChannel::SendFail;
setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), binaryData.url(), binaryData.type(), binaryData.size()));
- Ref<Bridge> protect(*this);
- waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (!clientWrapper)
- return ThreadableWebSocketChannel::SendFail;
- return clientWrapper->sendRequestResult();
-}
-void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ m_loaderProxy.postTaskToLoader([peer = m_peer, url = binaryData.url().isolatedCopy(), type = binaryData.type().isolatedCopy(), size = binaryData.size()](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
- peer->bufferedAmount();
+ peer->send(Blob::deserialize(url, type, size, { }));
+ });
+
+ Ref<Bridge> protectedThis(*this);
+ waitForMethodCompletion();
+ return m_workerClientWrapper->sendRequestResult();
}
-unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
+unsigned WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
{
- if (!m_workerClientWrapper || !m_peer)
+ if (!m_peer)
return 0;
setMethodNotCompleted();
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
- Ref<Bridge> protect(*this);
- waitForMethodCompletion();
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
- if (clientWrapper)
- return clientWrapper->bufferedAmount();
- return 0;
-}
-void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer, int code, const String& reason)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
- peer->close(code, reason);
+ peer->bufferedAmount();
+ });
+
+ Ref<Bridge> protectedThis(*this);
+ waitForMethodCompletion();
+ return m_workerClientWrapper->bufferedAmount();
}
void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
{
if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason));
-}
-void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ m_loaderProxy.postTaskToLoader([peer = m_peer, code, reason = reason.isolatedCopy()](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
- peer->fail(reason);
+ peer->close(code, reason);
+ });
}
void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
{
if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason));
-}
-void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, PassOwnPtr<Peer> peer)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT_UNUSED(peer, peer);
+ m_loaderProxy.postTaskToLoader([peer = m_peer, reason = reason.isolatedCopy()](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
- // Peer object will be deleted even if the task does not run in the main thread's cleanup period, because
- // the destructor for the task object (created by createCallbackTask()) will automatically delete the peer.
+ peer->fail(reason);
+ });
}
void WorkerThreadableWebSocketChannel::Bridge::disconnect()
{
clearClientWrapper();
if (m_peer) {
- OwnPtr<Peer> peer = adoptPtr(m_peer);
- m_peer = 0;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
+ m_loaderProxy.postTaskToLoader([peer = std::unique_ptr<Peer>(m_peer)](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ });
+ m_peer = nullptr;
}
- m_workerGlobalScope = 0;
-}
-
-void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
-
- peer->suspend();
+ m_workerGlobalScope = nullptr;
}
void WorkerThreadableWebSocketChannel::Bridge::suspend()
{
if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
-}
-void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
-{
- ASSERT(isMainThread());
- ASSERT_UNUSED(context, context->isDocument());
- ASSERT(peer);
+ m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
- peer->resume();
+ peer->suspend();
+ });
}
void WorkerThreadableWebSocketChannel::Bridge::resume()
{
if (!m_peer)
return;
- m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
+
+ m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) {
+ ASSERT(isMainThread());
+ ASSERT_UNUSED(context, context.isDocument());
+ ASSERT(peer);
+
+ peer->resume();
+ });
}
void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
@@ -635,7 +568,6 @@ void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
{
- ASSERT(m_workerClientWrapper);
m_workerClientWrapper->clearSyncMethodDone();
}
@@ -645,12 +577,12 @@ void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
{
if (!m_workerGlobalScope)
return;
- WorkerRunLoop& runLoop = m_workerGlobalScope->thread()->runLoop();
+ WorkerRunLoop& runLoop = m_workerGlobalScope->thread().runLoop();
MessageQueueWaitResult result = MessageQueueMessageReceived;
- ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
+ ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.ptr();
while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null.
- clientWrapper = m_workerClientWrapper.get();
+ clientWrapper = m_workerClientWrapper.ptr();
}
}