diff options
author | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
---|---|---|
committer | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
commit | 1bf1084f2b10c3b47fd1a588d85d21ed0eb41d0c (patch) | |
tree | 46dcd36c86e7fbc6e5df36deb463b33e9967a6f7 /Source/WebKit2/Platform/IPC/Connection.cpp | |
parent | 32761a6cee1d0dee366b885b7b9c777e67885688 (diff) | |
download | WebKitGtk-tarball-master.tar.gz |
webkitgtk-2.16.5HEADwebkitgtk-2.16.5master
Diffstat (limited to 'Source/WebKit2/Platform/IPC/Connection.cpp')
-rw-r--r-- | Source/WebKit2/Platform/IPC/Connection.cpp | 748 |
1 files changed, 442 insertions, 306 deletions
diff --git a/Source/WebKit2/Platform/IPC/Connection.cpp b/Source/WebKit2/Platform/IPC/Connection.cpp index 615e707ba..39a504b9b 100644 --- a/Source/WebKit2/Platform/IPC/Connection.cpp +++ b/Source/WebKit2/Platform/IPC/Connection.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2010 Apple Inc. All rights reserved. + * Copyright (C) 2010-2016 Apple Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -26,6 +26,8 @@ #include "config.h" #include "Connection.h" +#include "Logging.h" +#include <memory> #include <wtf/CurrentTime.h> #include <wtf/HashSet.h> #include <wtf/NeverDestroyed.h> @@ -33,121 +35,116 @@ #include <wtf/text/WTFString.h> #include <wtf/threads/BinarySemaphore.h> +#if PLATFORM(COCOA) +#include "MachMessage.h" +#endif + +#if USE(UNIX_DOMAIN_SOCKETS) +#include "UnixMessage.h" +#endif + namespace IPC { -class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> { +struct Connection::ReplyHandler { + RefPtr<FunctionDispatcher> dispatcher; + Function<void (std::unique_ptr<Decoder>)> handler; +}; + +struct Connection::WaitForMessageState { + WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, OptionSet<WaitForOption> waitForOptions) + : messageReceiverName(messageReceiverName) + , messageName(messageName) + , destinationID(destinationID) + , waitForOptions(waitForOptions) + { + } + + StringReference messageReceiverName; + StringReference messageName; + uint64_t destinationID; + + OptionSet<WaitForOption> waitForOptions; + bool messageWaitingInterrupted = false; + + std::unique_ptr<Decoder> decoder; +}; + +class Connection::SyncMessageState { public: - static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*); - ~SyncMessageState(); + static SyncMessageState& singleton(); + + SyncMessageState(); + ~SyncMessageState() = delete; void wakeUpClientRunLoop() { m_waitForSyncReplySemaphore.signal(); } - bool wait(double absoluteTime) + bool wait(TimeWithDynamicClockType absoluteTime) { return m_waitForSyncReplySemaphore.wait(absoluteTime); } // Returns true if this message will be handled on a client thread that is currently // waiting for a reply to a synchronous message. - bool processIncomingMessage(Connection*, std::unique_ptr<MessageDecoder>&); + bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&); // Dispatch pending sync messages. if allowedConnection is not null, will only dispatch messages // from that connection and put the other messages back in the queue. void dispatchMessages(Connection* allowedConnection); private: - explicit SyncMessageState(RunLoop*); - - typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap; - static SyncMessageStateMap& syncMessageStateMap() - { - static NeverDestroyed<SyncMessageStateMap> syncMessageStateMap; - return syncMessageStateMap; - } + void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection&); - static Mutex& syncMessageStateMapMutex() - { - static NeverDestroyed<Mutex> syncMessageStateMapMutex; - return syncMessageStateMapMutex; - } - - void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection*); - - RunLoop* m_runLoop; BinarySemaphore m_waitForSyncReplySemaphore; // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply. - Mutex m_mutex; + Lock m_mutex; // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection. HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet; struct ConnectionAndIncomingMessage { - RefPtr<Connection> connection; - std::unique_ptr<MessageDecoder> message; + Ref<Connection> connection; + std::unique_ptr<Decoder> message; }; Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply; }; -class Connection::SecondaryThreadPendingSyncReply { -public: - // The reply decoder, will be null if there was an error processing the sync message on the other side. - std::unique_ptr<MessageDecoder> replyDecoder; - - BinarySemaphore semaphore; -}; - - -PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop) +Connection::SyncMessageState& Connection::SyncMessageState::singleton() { - MutexLocker locker(syncMessageStateMapMutex()); - SyncMessageStateMap::AddResult result = syncMessageStateMap().add(runLoop, nullptr); - - if (!result.isNewEntry) { - ASSERT(result.iterator->value); - return result.iterator->value; - } - - RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop)); - result.iterator->value = syncMessageState.get(); + static std::once_flag onceFlag; + static LazyNeverDestroyed<SyncMessageState> syncMessageState; - return syncMessageState.release(); -} + std::call_once(onceFlag, [] { + syncMessageState.construct(); + }); -Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop) - : m_runLoop(runLoop) -{ + return syncMessageState; } -Connection::SyncMessageState::~SyncMessageState() +Connection::SyncMessageState::SyncMessageState() { - MutexLocker locker(syncMessageStateMapMutex()); - - ASSERT(syncMessageStateMap().contains(m_runLoop)); - syncMessageStateMap().remove(m_runLoop); - - ASSERT(m_messagesToDispatchWhileWaitingForSyncReply.isEmpty()); } -bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, std::unique_ptr<MessageDecoder>& message) +bool Connection::SyncMessageState::processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>& message) { if (!message->shouldDispatchMessageWhenWaitingForSyncReply()) return false; - ConnectionAndIncomingMessage connectionAndIncomingMessage; - connectionAndIncomingMessage.connection = connection; - connectionAndIncomingMessage.message = std::move(message); + ConnectionAndIncomingMessage connectionAndIncomingMessage { connection, WTFMove(message) }; { - MutexLocker locker(m_mutex); + std::lock_guard<Lock> lock(m_mutex); - if (m_didScheduleDispatchMessagesWorkSet.add(connection).isNewEntry) - m_runLoop->dispatch(bind(&SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection, this, RefPtr<Connection>(connection))); + if (m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry) { + RunLoop::main().dispatch([this, protectedConnection = Ref<Connection>(connection)]() mutable { + dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(protectedConnection); + }); + } - m_messagesToDispatchWhileWaitingForSyncReply.append(std::move(connectionAndIncomingMessage)); + m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(connectionAndIncomingMessage)); } wakeUpClientRunLoop(); @@ -157,12 +154,12 @@ bool Connection::SyncMessageState::processIncomingMessage(Connection* connection void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnection) { - ASSERT(m_runLoop == RunLoop::current()); + ASSERT(RunLoop::isMain()); Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply; { - MutexLocker locker(m_mutex); + std::lock_guard<Lock> lock(m_mutex); m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply); } @@ -171,46 +168,66 @@ void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnectio for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) { ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i]; - if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection) { + if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection.ptr()) { // This incoming message belongs to another connection and we don't want to dispatch it now // so mark it to be put back in the message queue. - messagesToPutBack.append(std::move(connectionAndIncomingMessage)); + messagesToPutBack.append(WTFMove(connectionAndIncomingMessage)); continue; } - connectionAndIncomingMessage.connection->dispatchMessage(std::move(connectionAndIncomingMessage.message)); + connectionAndIncomingMessage.connection->dispatchMessage(WTFMove(connectionAndIncomingMessage.message)); } if (!messagesToPutBack.isEmpty()) { - MutexLocker locker(m_mutex); + std::lock_guard<Lock> lock(m_mutex); for (auto& message : messagesToPutBack) - m_messagesToDispatchWhileWaitingForSyncReply.append(std::move(message)); + m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(message)); } } -void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection* connection) +void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection& connection) { { - MutexLocker locker(m_mutex); - ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(connection)); - m_didScheduleDispatchMessagesWorkSet.remove(connection); + std::lock_guard<Lock> lock(m_mutex); + ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(&connection)); + m_didScheduleDispatchMessagesWorkSet.remove(&connection); } - dispatchMessages(connection); + dispatchMessages(&connection); } -PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop) +// Represents a sync request for which we're waiting on a reply. +struct Connection::PendingSyncReply { + // The request ID. + uint64_t syncRequestID { 0 }; + + // The reply decoder, will be null if there was an error processing the sync + // message on the other side. + std::unique_ptr<Decoder> replyDecoder; + + // Will be set to true once a reply has been received. + bool didReceiveReply { false }; + + PendingSyncReply() = default; + + explicit PendingSyncReply(uint64_t syncRequestID) + : syncRequestID(syncRequestID) + { + } +}; + +Ref<Connection> Connection::createServerConnection(Identifier identifier, Client& client) { - return adoptRef(new Connection(identifier, true, client, clientRunLoop)); + return adoptRef(*new Connection(identifier, true, client)); } -PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop) +Ref<Connection> Connection::createClientConnection(Identifier identifier, Client& client) { - return adoptRef(new Connection(identifier, false, client, clientRunLoop)); + return adoptRef(*new Connection(identifier, false, client)); } -Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop) +Connection::Connection(Identifier identifier, bool isServer, Client& client) : m_client(client) , m_isServer(isServer) , m_syncRequestID(0) @@ -219,17 +236,21 @@ Connection::Connection(Identifier identifier, bool isServer, Client* client, Run , m_didCloseOnConnectionWorkQueueCallback(0) , m_isConnected(false) , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue")) - , m_clientRunLoop(clientRunLoop) , m_inSendSyncCount(0) , m_inDispatchMessageCount(0) , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0) , m_didReceiveInvalidMessage(false) - , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop)) + , m_waitingForMessage(nullptr) , m_shouldWaitForSyncReplies(true) { - ASSERT(m_client); + ASSERT(RunLoop::isMain()); platformInitialize(identifier); + +#if HAVE(QOS_CLASSES) + ASSERT(pthread_main_np()); + m_mainThread = pthread_self(); +#endif } Connection::~Connection() @@ -251,63 +272,52 @@ void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMess m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure; } -void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue* workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver) +void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver) { - ASSERT(RunLoop::current() == m_clientRunLoop); - ASSERT(!m_isConnected); + ASSERT(RunLoop::isMain()); - m_connectionQueue->dispatch(bind(&Connection::addWorkQueueMessageReceiverOnConnectionWorkQueue, this, messageReceiverName, RefPtr<WorkQueue>(workQueue), RefPtr<WorkQueueMessageReceiver>(workQueueMessageReceiver))); -} + m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), workQueue = &workQueue, workQueueMessageReceiver]() mutable { + ASSERT(!protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName)); -void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName) -{ - ASSERT(RunLoop::current() == m_clientRunLoop); - - m_connectionQueue->dispatch(bind(&Connection::removeWorkQueueMessageReceiverOnConnectionWorkQueue, this, messageReceiverName)); + protectedThis->m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver)); + }); } -void Connection::addWorkQueueMessageReceiverOnConnectionWorkQueue(StringReference messageReceiverName, WorkQueue* workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver) +void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName) { - ASSERT(workQueue); - ASSERT(workQueueMessageReceiver); - ASSERT(!m_workQueueMessageReceivers.contains(messageReceiverName)); - - m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver)); -} + ASSERT(RunLoop::isMain()); -void Connection::removeWorkQueueMessageReceiverOnConnectionWorkQueue(StringReference messageReceiverName) -{ - ASSERT(m_workQueueMessageReceivers.contains(messageReceiverName)); - m_workQueueMessageReceivers.remove(messageReceiverName); + m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName)]() mutable { + ASSERT(protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName)); + protectedThis->m_workQueueMessageReceivers.remove(messageReceiverName); + }); } -void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver* workQueueMessageReceiver, MessageDecoder* incomingMessageDecoder) +void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver& workQueueMessageReceiver, Decoder& decoder) { - OwnPtr<MessageDecoder> decoder = adoptPtr(incomingMessageDecoder); - - if (!decoder->isSyncMessage()) { - workQueueMessageReceiver->didReceiveMessage(this, *decoder); + if (!decoder.isSyncMessage()) { + workQueueMessageReceiver.didReceiveMessage(*this, decoder); return; } uint64_t syncRequestID = 0; - if (!decoder->decode(syncRequestID) || !syncRequestID) { + if (!decoder.decode(syncRequestID) || !syncRequestID) { // We received an invalid sync message. // FIXME: Handle this. - decoder->markInvalid(); + decoder.markInvalid(); return; } - auto replyEncoder = std::make_unique<MessageEncoder>("IPC", "SyncMessageReply", syncRequestID); + auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID); // Hand off both the decoder and encoder to the work queue message receiver. - workQueueMessageReceiver->didReceiveSyncMessage(this, *decoder, replyEncoder); + workQueueMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder); // FIXME: If the message was invalid, we should send back a SyncMessageError. - ASSERT(!decoder->isInvalid()); + ASSERT(!decoder.isInvalid()); if (replyEncoder) - sendSyncReply(std::move(replyEncoder)); + sendSyncReply(WTFMove(replyEncoder)); } void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback) @@ -319,15 +329,29 @@ void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWo void Connection::invalidate() { + ASSERT(RunLoop::isMain()); + if (!isValid()) { // Someone already called invalidate(). return; } - // Reset the client. - m_client = 0; + m_isValid = false; - m_connectionQueue->dispatch(WTF::bind(&Connection::platformInvalidate, this)); + { + std::lock_guard<Lock> lock(m_replyHandlersLock); + for (auto& replyHandler : m_replyHandlers.values()) { + replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] { + handler(nullptr); + }); + } + + m_replyHandlers.clear(); + } + + m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable { + protectedThis->platformInvalidate(); + }); } void Connection::markCurrentlyDispatchedMessageAsInvalid() @@ -338,9 +362,9 @@ void Connection::markCurrentlyDispatchedMessageAsInvalid() m_didReceiveInvalidMessage = true; } -std::unique_ptr<MessageEncoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID) +std::unique_ptr<Encoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID) { - auto encoder = std::make_unique<MessageEncoder>(messageReceiverName, messageName, destinationID); + auto encoder = std::make_unique<Encoder>(messageReceiverName, messageName, destinationID); encoder->setIsSyncMessage(true); // Encode the sync request ID. @@ -350,78 +374,126 @@ std::unique_ptr<MessageEncoder> Connection::createSyncMessageEncoder(StringRefer return encoder; } -bool Connection::sendMessage(std::unique_ptr<MessageEncoder> encoder, unsigned messageSendFlags) +bool Connection::sendMessage(std::unique_ptr<Encoder> encoder, OptionSet<SendOption> sendOptions) { if (!isValid()) return false; - if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply + if (RunLoop::isMain() && m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting && !encoder->isSyncMessage() && !(encoder->messageReceiverName() == "IPC")) { + uint64_t syncRequestID; + auto wrappedMessage = createSyncMessageEncoder("IPC", "WrappedAsyncMessageForTesting", encoder->destinationID(), syncRequestID); + wrappedMessage->setFullySynchronousModeForTesting(); + wrappedMessage->wrapForTesting(WTFMove(encoder)); + return static_cast<bool>(sendSyncMessage(syncRequestID, WTFMove(wrappedMessage), Seconds::infinity(), { })); + } + + if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForSyncReply) && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount)) encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true); { - MutexLocker locker(m_outgoingMessagesLock); - m_outgoingMessages.append(std::move(encoder)); + std::lock_guard<Lock> lock(m_outgoingMessagesMutex); + m_outgoingMessages.append(WTFMove(encoder)); } // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled. - m_connectionQueue->dispatch(WTF::bind(&Connection::sendOutgoingMessages, this)); + m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable { + protectedThis->sendOutgoingMessages(); + }); return true; } -bool Connection::sendSyncReply(std::unique_ptr<MessageEncoder> encoder) +void Connection::sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder> encoder, FunctionDispatcher& replyDispatcher, Function<void (std::unique_ptr<Decoder>)>&& replyHandler) { - return sendMessage(std::move(encoder)); + { + std::lock_guard<Lock> lock(m_replyHandlersLock); + + if (!isValid()) { + replyDispatcher.dispatch([replyHandler = WTFMove(replyHandler)] { + replyHandler(nullptr); + }); + return; + } + + ASSERT(!m_replyHandlers.contains(requestID)); + m_replyHandlers.set(requestID, ReplyHandler { &replyDispatcher, WTFMove(replyHandler) }); + } + + sendMessage(WTFMove(encoder), { }); +} + +bool Connection::sendSyncReply(std::unique_ptr<Encoder> encoder) +{ + return sendMessage(WTFMove(encoder), { }); } -std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout) +Seconds Connection::timeoutRespectingIgnoreTimeoutsForTesting(Seconds timeout) const { + return m_ignoreTimeoutsForTesting ? Seconds::infinity() : timeout; +} + +std::unique_ptr<Decoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions) +{ + ASSERT(RunLoop::isMain()); + + timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout); + + bool hasIncomingSynchronousMessage = false; + // First, check if this message is already in the incoming messages queue. { - MutexLocker locker(m_incomingMessagesLock); + std::lock_guard<Lock> lock(m_incomingMessagesMutex); for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) { - std::unique_ptr<MessageDecoder>& message = *it; + std::unique_ptr<Decoder>& message = *it; if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) { - std::unique_ptr<MessageDecoder> returnedMessage = std::move(message); + std::unique_ptr<Decoder> returnedMessage = WTFMove(message); m_incomingMessages.remove(it); return returnedMessage; } + + if (message->isSyncMessage()) + hasIncomingSynchronousMessage = true; } } - std::pair<std::pair<StringReference, StringReference>, uint64_t> messageAndDestination(std::make_pair(std::make_pair(messageReceiverName, messageName), destinationID)); - + // Don't even start waiting if we have InterruptWaitingIfSyncMessageArrives and there's a sync message already in the queue. + if (hasIncomingSynchronousMessage && waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives)) { + m_waitingForMessage = nullptr; + return nullptr; + } + + WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForOptions); + { - std::lock_guard<std::mutex> lock(m_waitForMessageMutex); + std::lock_guard<Lock> lock(m_waitForMessageMutex); - // We don't support having multiple clients wait for the same message. - ASSERT(!m_waitForMessageMap.contains(messageAndDestination)); - - // Insert our pending wait. - m_waitForMessageMap.set(messageAndDestination, nullptr); + // We don't support having multiple clients waiting for messages. + ASSERT(!m_waitingForMessage); + + m_waitingForMessage = &waitingForMessage; } - + + MonotonicTime absoluteTimeout = MonotonicTime::now() + timeout; + // Now wait for it to be set. while (true) { - std::unique_lock<std::mutex> lock(m_waitForMessageMutex); - - auto it = m_waitForMessageMap.find(messageAndDestination); - if (it->value) { - std::unique_ptr<MessageDecoder> decoder = std::move(it->value); - m_waitForMessageMap.remove(it); + std::unique_lock<Lock> lock(m_waitForMessageMutex); + if (m_waitingForMessage->decoder) { + auto decoder = WTFMove(m_waitingForMessage->decoder); + m_waitingForMessage = nullptr; return decoder; } // Now we wait. - if (m_waitForMessageCondition.wait_for(lock, timeout) == std::cv_status::timeout) { - // We timed out, now remove the pending wait. - m_waitForMessageMap.remove(messageAndDestination); - + bool didTimeout = !m_waitForMessageCondition.waitUntil(lock, absoluteTimeout); + // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting. + if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) { + m_waitingForMessage = nullptr; break; } } @@ -429,13 +501,9 @@ std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messa return nullptr; } -std::unique_ptr<MessageDecoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<MessageEncoder> encoder, std::chrono::milliseconds timeout, unsigned syncSendFlags) +std::unique_ptr<Decoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder> encoder, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions) { - if (RunLoop::current() != m_clientRunLoop) { - // No flags are supported for synchronous messages sent from secondary threads. - ASSERT(!syncSendFlags); - return sendSyncMessageFromSecondaryThread(syncRequestID, std::move(encoder), timeout); - } + ASSERT(RunLoop::isMain()); if (!isValid()) { didFailToSendSyncMessage(); @@ -444,7 +512,7 @@ std::unique_ptr<MessageDecoder> Connection::sendSyncMessage(uint64_t syncRequest // Push the pending sync reply information on our stack. { - MutexLocker locker(m_syncReplyStateMutex); + LockHolder locker(m_syncReplyStateMutex); if (!m_shouldWaitForSyncReplies) { didFailToSendSyncMessage(); return nullptr; @@ -456,18 +524,18 @@ std::unique_ptr<MessageDecoder> Connection::sendSyncMessage(uint64_t syncRequest ++m_inSendSyncCount; // First send the message. - sendMessage(std::move(encoder), DispatchMessageEvenWhenWaitingForSyncReply); + sendMessage(WTFMove(encoder), IPC::SendOption::DispatchMessageEvenWhenWaitingForSyncReply); // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so // keep an extra reference to the connection here in case it's invalidated. Ref<Connection> protect(*this); - std::unique_ptr<MessageDecoder> reply = waitForSyncReply(syncRequestID, timeout, syncSendFlags); + std::unique_ptr<Decoder> reply = waitForSyncReply(syncRequestID, timeout, sendSyncOptions); --m_inSendSyncCount; // Finally, pop the pending sync reply information. { - MutexLocker locker(m_syncReplyStateMutex); + LockHolder locker(m_syncReplyStateMutex); ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID); m_pendingSyncReplies.removeLast(); } @@ -478,50 +546,20 @@ std::unique_ptr<MessageDecoder> Connection::sendSyncMessage(uint64_t syncRequest return reply; } -std::unique_ptr<MessageDecoder> Connection::sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, std::unique_ptr<MessageEncoder> encoder, std::chrono::milliseconds timeout) -{ - ASSERT(RunLoop::current() != m_clientRunLoop); - - if (!isValid()) - return nullptr; - - SecondaryThreadPendingSyncReply pendingReply; - - // Push the pending sync reply information on our stack. - { - MutexLocker locker(m_syncReplyStateMutex); - if (!m_shouldWaitForSyncReplies) - return nullptr; - - ASSERT(!m_secondaryThreadPendingSyncReplyMap.contains(syncRequestID)); - m_secondaryThreadPendingSyncReplyMap.add(syncRequestID, &pendingReply); - } - - sendMessage(std::move(encoder), 0); - - pendingReply.semaphore.wait(currentTime() + (timeout.count() / 1000.0)); - - // Finally, pop the pending sync reply information. - { - MutexLocker locker(m_syncReplyStateMutex); - ASSERT(m_secondaryThreadPendingSyncReplyMap.contains(syncRequestID)); - m_secondaryThreadPendingSyncReplyMap.remove(syncRequestID); - } - - return std::move(pendingReply.replyDecoder); -} - -std::unique_ptr<MessageDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags) +std::unique_ptr<Decoder> Connection::waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions) { - double absoluteTime = currentTime() + (timeout.count() / 1000.0); + timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout); + WallTime absoluteTime = WallTime::now() + timeout; + willSendSyncMessage(sendSyncOptions); + bool timedOut = false; while (!timedOut) { // First, check if we have any messages that we need to process. - m_syncMessageState->dispatchMessages(0); + SyncMessageState::singleton().dispatchMessages(nullptr); { - MutexLocker locker(m_syncReplyStateMutex); + LockHolder locker(m_syncReplyStateMutex); // Second, check if there is a sync reply at the top of the stack. ASSERT(!m_pendingSyncReplies.isEmpty()); @@ -530,129 +568,192 @@ std::unique_ptr<MessageDecoder> Connection::waitForSyncReply(uint64_t syncReques ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID); // We found the sync reply, or the connection was closed. - if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) - return std::move(pendingSyncReply.replyDecoder); + if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) { + didReceiveSyncReply(sendSyncOptions); + return WTFMove(pendingSyncReply.replyDecoder); + } } // Processing a sync message could cause the connection to be invalidated. // (If the handler ends up calling Connection::invalidate). // If that happens, we need to stop waiting, or we'll hang since we won't get // any more incoming messages. - if (!isValid()) + if (!isValid()) { + RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Connection no longer valid, id = %" PRIu64, syncRequestID); + didReceiveSyncReply(sendSyncOptions); return nullptr; + } // We didn't find a sync reply yet, keep waiting. // This allows the WebProcess to still serve clients while waiting for the message to return. // Notably, it can continue to process accessibility requests, which are on the main thread. - if (syncSendFlags & SpinRunLoopWhileWaitingForReply) { -#if PLATFORM(MAC) - // FIXME: Although we run forever, any events incoming will cause us to drop out and exit out. This however doesn't - // account for a timeout value passed in. Timeout is always NoTimeout in these cases, but that could change. - RunLoop::current()->runForDuration(1e10); - timedOut = currentTime() >= absoluteTime; -#endif - } else - timedOut = !m_syncMessageState->wait(absoluteTime); - + timedOut = !SyncMessageState::singleton().wait(absoluteTime); } + RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply, id = %" PRIu64, syncRequestID); + didReceiveSyncReply(sendSyncOptions); + return nullptr; } -void Connection::processIncomingSyncReply(std::unique_ptr<MessageDecoder> decoder) +void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder) { - MutexLocker locker(m_syncReplyStateMutex); + { + LockHolder locker(m_syncReplyStateMutex); - // Go through the stack of sync requests that have pending replies and see which one - // this reply is for. - for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) { - PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1]; + // Go through the stack of sync requests that have pending replies and see which one + // this reply is for. + for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) { + PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1]; - if (pendingSyncReply.syncRequestID != decoder->destinationID()) - continue; + if (pendingSyncReply.syncRequestID != decoder->destinationID()) + continue; - ASSERT(!pendingSyncReply.replyDecoder); + ASSERT(!pendingSyncReply.replyDecoder); - pendingSyncReply.replyDecoder = std::move(decoder); - pendingSyncReply.didReceiveReply = true; + pendingSyncReply.replyDecoder = WTFMove(decoder); + pendingSyncReply.didReceiveReply = true; - // We got a reply to the last send message, wake up the client run loop so it can be processed. - if (i == m_pendingSyncReplies.size()) - m_syncMessageState->wakeUpClientRunLoop(); + // We got a reply to the last send message, wake up the client run loop so it can be processed. + if (i == m_pendingSyncReplies.size()) + SyncMessageState::singleton().wakeUpClientRunLoop(); - return; + return; + } } - // If it's not a reply to any primary thread message, check if it is a reply to a secondary thread one. - SecondaryThreadPendingSyncReplyMap::iterator secondaryThreadReplyMapItem = m_secondaryThreadPendingSyncReplyMap.find(decoder->destinationID()); - if (secondaryThreadReplyMapItem != m_secondaryThreadPendingSyncReplyMap.end()) { - SecondaryThreadPendingSyncReply* reply = secondaryThreadReplyMapItem->value; - ASSERT(!reply->replyDecoder); - reply->replyDecoder = std::move(decoder); - reply->semaphore.signal(); + { + LockHolder locker(m_replyHandlersLock); + + auto replyHandler = m_replyHandlers.take(decoder->destinationID()); + if (replyHandler.dispatcher) { + replyHandler.dispatcher->dispatch([protectedThis = makeRef(*this), handler = WTFMove(replyHandler.handler), decoder = WTFMove(decoder)] () mutable { + if (!protectedThis->isValid()) { + handler(nullptr); + return; + } + + handler(WTFMove(decoder)); + }); + } } // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map. // This can happen if the send timed out, so it's fine to ignore. } -void Connection::processIncomingMessage(std::unique_ptr<MessageDecoder> message) +void Connection::processIncomingMessage(std::unique_ptr<Decoder> message) { ASSERT(!message->messageReceiverName().isEmpty()); ASSERT(!message->messageName().isEmpty()); if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") { - processIncomingSyncReply(std::move(message)); + processIncomingSyncReply(WTFMove(message)); return; } if (!m_workQueueMessageReceivers.isValidKey(message->messageReceiverName())) { - if (message->messageReceiverName().isEmpty() && message->messageName().isEmpty()) { - // Something went wrong when decoding the message. Encode the message length so we can figure out if this - // happens for certain message lengths. - CString messageReceiverName = "<unknown message>"; - CString messageName = String::format("<message length: %zu bytes>", message->length()).utf8(); - - m_clientRunLoop->dispatch(bind(&Connection::dispatchDidReceiveInvalidMessage, this, messageReceiverName, messageName)); - return; - } - - m_clientRunLoop->dispatch(bind(&Connection::dispatchDidReceiveInvalidMessage, this, message->messageReceiverName().toString(), message->messageName().toString())); + RefPtr<Connection> protectedThis(this); + StringReference messageReceiverNameReference = message->messageReceiverName(); + String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size())); + StringReference messageNameReference = message->messageName(); + String messageName(messageNameReference.isEmpty() ? "<unknown message>" : String(messageNameReference.data(), messageNameReference.size())); + + RunLoop::main().dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), messageName = WTFMove(messageName)]() mutable { + protectedThis->dispatchDidReceiveInvalidMessage(messageReceiverName.utf8(), messageName.utf8()); + }); return; } auto it = m_workQueueMessageReceivers.find(message->messageReceiverName()); if (it != m_workQueueMessageReceivers.end()) { - it->value.first->dispatch(bind(&Connection::dispatchWorkQueueMessageReceiverMessage, this, it->value.second, message.release())); + it->value.first->dispatch([protectedThis = makeRef(*this), workQueueMessageReceiver = it->value.second, decoder = WTFMove(message)]() mutable { + protectedThis->dispatchWorkQueueMessageReceiverMessage(*workQueueMessageReceiver, *decoder); + }); return; } +#if HAVE(QOS_CLASSES) + if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) { + pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0); + message->setQOSClassOverride(override); + } +#endif + + if (message->isSyncMessage()) { + std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex); + + for (auto& callback : m_incomingSyncMessageCallbacks.values()) + m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback)); + + m_incomingSyncMessageCallbacks.clear(); + } + // Check if this is a sync message or if it's a message that should be dispatched even when waiting for // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched. // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply. - if (m_syncMessageState->processIncomingMessage(this, message)) + if (SyncMessageState::singleton().processIncomingMessage(*this, message)) return; // Check if we're waiting for this message. { - std::lock_guard<std::mutex> lock(m_waitForMessageMutex); + std::lock_guard<Lock> lock(m_waitForMessageMutex); + + if (m_waitingForMessage && !m_waitingForMessage->decoder) { + if (m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) { + m_waitingForMessage->decoder = WTFMove(message); + ASSERT(m_waitingForMessage->decoder); + m_waitForMessageCondition.notifyOne(); + return; + } - auto it = m_waitForMessageMap.find(std::make_pair(std::make_pair(message->messageReceiverName(), message->messageName()), message->destinationID())); - if (it != m_waitForMessageMap.end()) { - it->value = std::move(message); - ASSERT(it->value); - - m_waitForMessageCondition.notify_one(); - return; + if (m_waitingForMessage->waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) { + m_waitingForMessage->messageWaitingInterrupted = true; + m_waitForMessageCondition.notifyOne(); + } } } - enqueueIncomingMessage(std::move(message)); + enqueueIncomingMessage(WTFMove(message)); +} + +uint64_t Connection::installIncomingSyncMessageCallback(std::function<void ()> callback) +{ + std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex); + + m_nextIncomingSyncMessageCallbackID++; + + if (!m_incomingSyncMessageCallbackQueue) + m_incomingSyncMessageCallbackQueue = WorkQueue::create("com.apple.WebKit.IPC.IncomingSyncMessageCallbackQueue"); + + m_incomingSyncMessageCallbacks.add(m_nextIncomingSyncMessageCallbackID, callback); + + return m_nextIncomingSyncMessageCallbackID; +} + +void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID) +{ + std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex); + m_incomingSyncMessageCallbacks.remove(callbackID); +} + +bool Connection::hasIncomingSyncMessage() +{ + std::lock_guard<Lock> lock(m_incomingMessagesMutex); + + for (auto& message : m_incomingMessages) { + if (message->isSyncMessage()) + return true; + } + + return false; } void Connection::postConnectionDidCloseOnConnectionWorkQueue() { - m_connectionQueue->dispatch(WTF::bind(&Connection::connectionDidClose, this)); + m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable { + protectedThis->connectionDidClose(); + }); } void Connection::connectionDidClose() @@ -661,38 +762,48 @@ void Connection::connectionDidClose() platformInvalidate(); { - MutexLocker locker(m_syncReplyStateMutex); + LockHolder locker(m_replyHandlersLock); + for (auto& replyHandler : m_replyHandlers.values()) { + replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] { + handler(nullptr); + }); + } + + m_replyHandlers.clear(); + } + + { + LockHolder locker(m_syncReplyStateMutex); ASSERT(m_shouldWaitForSyncReplies); m_shouldWaitForSyncReplies = false; if (!m_pendingSyncReplies.isEmpty()) - m_syncMessageState->wakeUpClientRunLoop(); + SyncMessageState::singleton().wakeUpClientRunLoop(); + } - for (SecondaryThreadPendingSyncReplyMap::iterator iter = m_secondaryThreadPendingSyncReplyMap.begin(); iter != m_secondaryThreadPendingSyncReplyMap.end(); ++iter) - iter->value->semaphore.signal(); + { + std::lock_guard<Lock> lock(m_waitForMessageMutex); + if (m_waitingForMessage) + m_waitingForMessage->messageWaitingInterrupted = true; } + m_waitForMessageCondition.notifyAll(); if (m_didCloseOnConnectionWorkQueueCallback) m_didCloseOnConnectionWorkQueueCallback(this); - m_clientRunLoop->dispatch(WTF::bind(&Connection::dispatchConnectionDidClose, this)); -} + RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable { + // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called, + // then the the connection will be invalid here. + if (!protectedThis->isValid()) + return; -void Connection::dispatchConnectionDidClose() -{ - // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called, - // then the client will be null here. - if (!m_client) - return; + // Set m_isValid to false before calling didClose, otherwise, sendSync will try to send a message + // to the connection and will then wait indefinitely for a reply. + protectedThis->m_isValid = false; - // Because we define a connection as being "valid" based on wheter it has a null client, we null out - // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and - // will then wait indefinitely for a reply. - Client* client = m_client; - m_client = 0; - - client->didClose(this); + protectedThis->m_client.didClose(protectedThis.get()); + }); } bool Connection::canSendOutgoingMessages() const @@ -706,21 +817,21 @@ void Connection::sendOutgoingMessages() return; while (true) { - std::unique_ptr<MessageEncoder> message; + std::unique_ptr<Encoder> message; { - MutexLocker locker(m_outgoingMessagesLock); + std::lock_guard<Lock> lock(m_outgoingMessagesMutex); if (m_outgoingMessages.isEmpty()) break; message = m_outgoingMessages.takeFirst(); } - if (!sendOutgoingMessage(std::move(message))) + if (!sendOutgoingMessage(WTFMove(message))) break; } } -void Connection::dispatchSyncMessage(MessageDecoder& decoder) +void Connection::dispatchSyncMessage(Decoder& decoder) { ASSERT(decoder.isSyncMessage()); @@ -731,26 +842,38 @@ void Connection::dispatchSyncMessage(MessageDecoder& decoder) return; } - auto replyEncoder = std::make_unique<MessageEncoder>("IPC", "SyncMessageReply", syncRequestID); + auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID); - // Hand off both the decoder and encoder to the client. - m_client->didReceiveSyncMessage(this, decoder, replyEncoder); + if (decoder.messageReceiverName() == "IPC" && decoder.messageName() == "WrappedAsyncMessageForTesting") { + if (!m_fullySynchronousModeIsAllowedForTesting) { + decoder.markInvalid(); + return; + } + std::unique_ptr<Decoder> unwrappedDecoder = Decoder::unwrapForTesting(decoder); + RELEASE_ASSERT(unwrappedDecoder); + processIncomingMessage(WTFMove(unwrappedDecoder)); + + SyncMessageState::singleton().dispatchMessages(nullptr); + } else { + // Hand off both the decoder and encoder to the client. + m_client.didReceiveSyncMessage(*this, decoder, replyEncoder); + } // FIXME: If the message was invalid, we should send back a SyncMessageError. ASSERT(!decoder.isInvalid()); if (replyEncoder) - sendSyncReply(std::move(replyEncoder)); + sendSyncReply(WTFMove(replyEncoder)); } void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString) { - ASSERT(RunLoop::current() == m_clientRunLoop); + ASSERT(RunLoop::isMain()); - if (!m_client) + if (!isValid()) return; - m_client->didReceiveInvalidMessage(this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length())); + m_client.didReceiveInvalidMessage(*this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length())); } void Connection::didFailToSendSyncMessage() @@ -761,28 +884,36 @@ void Connection::didFailToSendSyncMessage() exit(0); } -void Connection::enqueueIncomingMessage(std::unique_ptr<MessageDecoder> incomingMessage) +void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage) { { - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.append(std::move(incomingMessage)); + std::lock_guard<Lock> lock(m_incomingMessagesMutex); + m_incomingMessages.append(WTFMove(incomingMessage)); } - m_clientRunLoop->dispatch(WTF::bind(&Connection::dispatchOneMessage, this)); + RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable { + protectedThis->dispatchOneMessage(); + }); } -void Connection::dispatchMessage(MessageDecoder& decoder) +void Connection::dispatchMessage(Decoder& decoder) { - m_client->didReceiveMessage(this, decoder); + m_client.didReceiveMessage(*this, decoder); } -void Connection::dispatchMessage(std::unique_ptr<MessageDecoder> message) +void Connection::dispatchMessage(std::unique_ptr<Decoder> message) { - // If there's no client, return. We do this after calling releaseArguments so that - // the ArgumentDecoder message will be freed. - if (!m_client) + if (!isValid()) return; + if (message->shouldUseFullySynchronousModeForTesting()) { + if (!m_fullySynchronousModeIsAllowedForTesting) { + m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName()); + return; + } + m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting++; + } + m_inDispatchMessageCount++; if (message->shouldDispatchMessageWhenWaitingForSyncReply()) @@ -799,33 +930,38 @@ void Connection::dispatchMessage(std::unique_ptr<MessageDecoder> message) m_didReceiveInvalidMessage |= message->isInvalid(); m_inDispatchMessageCount--; + // FIXME: For Delayed synchronous messages, we should not decrement the counter until we send a response. + // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function. if (message->shouldDispatchMessageWhenWaitingForSyncReply()) m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--; - if (m_didReceiveInvalidMessage && m_client) - m_client->didReceiveInvalidMessage(this, message->messageReceiverName(), message->messageName()); + if (message->shouldUseFullySynchronousModeForTesting()) + m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting--; + + if (m_didReceiveInvalidMessage && isValid()) + m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName()); m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage; } void Connection::dispatchOneMessage() { - std::unique_ptr<MessageDecoder> message; + std::unique_ptr<Decoder> message; { - MutexLocker locker(m_incomingMessagesLock); + std::lock_guard<Lock> lock(m_incomingMessagesMutex); if (m_incomingMessages.isEmpty()) return; message = m_incomingMessages.takeFirst(); } - dispatchMessage(std::move(message)); + dispatchMessage(WTFMove(message)); } void Connection::wakeUpRunLoop() { - m_clientRunLoop->wakeUp(); + RunLoop::main().wakeUp(); } } // namespace IPC |