diff options
author | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
---|---|---|
committer | Lorry Tar Creator <lorry-tar-importer@lorry> | 2017-06-27 06:07:23 +0000 |
commit | 1bf1084f2b10c3b47fd1a588d85d21ed0eb41d0c (patch) | |
tree | 46dcd36c86e7fbc6e5df36deb463b33e9967a6f7 /Source/WebKit2/Platform/IPC/Connection.h | |
parent | 32761a6cee1d0dee366b885b7b9c777e67885688 (diff) | |
download | WebKitGtk-tarball-master.tar.gz |
webkitgtk-2.16.5HEADwebkitgtk-2.16.5master
Diffstat (limited to 'Source/WebKit2/Platform/IPC/Connection.h')
-rw-r--r-- | Source/WebKit2/Platform/IPC/Connection.h | 340 |
1 files changed, 198 insertions, 142 deletions
diff --git a/Source/WebKit2/Platform/IPC/Connection.h b/Source/WebKit2/Platform/IPC/Connection.h index 92df2689c..a5a8ad7b4 100644 --- a/Source/WebKit2/Platform/IPC/Connection.h +++ b/Source/WebKit2/Platform/IPC/Connection.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2010 Apple Inc. All rights reserved. + * Copyright (C) 2010-2016 Apple Inc. All rights reserved. * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. * @@ -25,48 +25,51 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef Connection_h -#define Connection_h +#pragma once -#include "Arguments.h" -#include "MessageDecoder.h" -#include "MessageEncoder.h" +#include "Decoder.h" +#include "Encoder.h" +#include "HandleMessage.h" #include "MessageReceiver.h" -#include "WorkQueue.h" #include <atomic> -#include <condition_variable> +#include <wtf/Condition.h> #include <wtf/Deque.h> #include <wtf/Forward.h> -#include <wtf/PassRefPtr.h> -#include <wtf/OwnPtr.h> +#include <wtf/HashMap.h> +#include <wtf/Lock.h> +#include <wtf/OptionSet.h> +#include <wtf/WorkQueue.h> #include <wtf/text/CString.h> -#if OS(DARWIN) +#if OS(DARWIN) && !USE(UNIX_DOMAIN_SOCKETS) #include <mach/mach_port.h> -#include <xpc/xpc.h> +#include <wtf/OSObjectPtr.h> +#include <wtf/spi/darwin/XPCSPI.h> #endif -#if PLATFORM(GTK) || PLATFORM(EFL) -#include "PlatformProcessIdentifier.h" +#if PLATFORM(GTK) +#include "GSocketMonitor.h" #endif -namespace WTF { -class RunLoop; -} - namespace IPC { -enum MessageSendFlags { +enum class SendOption { // Whether this message should be dispatched when waiting for a sync reply. // This is the default for synchronous messages. DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0, }; -enum SyncMessageSendFlags { - // Will allow events to continue being handled while waiting for the sync reply. - SpinRunLoopWhileWaitingForReply = 1 << 0, +enum class SendSyncOption { + // Use this to inform that this sync call will suspend this process until the user responds with input. + InformPlatformProcessWillSuspend = 1 << 0, + UseFullySynchronousModeForTesting = 1 << 1, }; - + +enum class WaitForOption { + // Use this to make waitForMessage be interrupted immediately by any incoming sync messages. + InterruptWaitingIfSyncMessageArrives = 1 << 0, +}; + #define MESSAGE_CHECK_BASE(assertion, connection) do \ if (!(assertion)) { \ ASSERT(assertion); \ @@ -75,12 +78,15 @@ enum SyncMessageSendFlags { } \ while (0) +class MachMessage; +class UnixMessage; + class Connection : public ThreadSafeRefCounted<Connection> { public: class Client : public MessageReceiver { public: - virtual void didClose(Connection*) = 0; - virtual void didReceiveInvalidMessage(Connection*, StringReference messageReceiverName, StringReference messageName) = 0; + virtual void didClose(Connection&) = 0; + virtual void didReceiveInvalidMessage(Connection&, StringReference messageReceiverName, StringReference messageName) = 0; protected: virtual ~Client() { } @@ -89,54 +95,55 @@ public: class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> { }; -#if OS(DARWIN) +#if USE(UNIX_DOMAIN_SOCKETS) + typedef int Identifier; + static bool identifierIsNull(Identifier identifier) { return identifier == -1; } + + struct SocketPair { + int client; + int server; + }; + + enum ConnectionOptions { + SetCloexecOnClient = 1 << 0, + SetCloexecOnServer = 1 << 1, + }; + + static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer); +#elif OS(DARWIN) struct Identifier { Identifier() : port(MACH_PORT_NULL) - , xpcConnection(0) { } Identifier(mach_port_t port) : port(port) - , xpcConnection(0) { } - Identifier(mach_port_t port, xpc_connection_t xpcConnection) + Identifier(mach_port_t port, OSObjectPtr<xpc_connection_t> xpcConnection) : port(port) - , xpcConnection(xpcConnection) + , xpcConnection(WTFMove(xpcConnection)) { } mach_port_t port; - xpc_connection_t xpcConnection; + OSObjectPtr<xpc_connection_t> xpcConnection; }; static bool identifierIsNull(Identifier identifier) { return identifier.port == MACH_PORT_NULL; } -#elif USE(UNIX_DOMAIN_SOCKETS) - typedef int Identifier; - static bool identifierIsNull(Identifier identifier) { return !identifier; } - - struct SocketPair { - int client; - int server; - }; - - enum ConnectionOptions { - SetCloexecOnClient = 1 << 0, - SetCloexecOnServer = 1 << 1, - }; - - static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer); + xpc_connection_t xpcConnection() const { return m_xpcConnection.get(); } + bool getAuditToken(audit_token_t&); + pid_t remoteProcessID() const; #endif - static PassRefPtr<Connection> createServerConnection(Identifier, Client*, WTF::RunLoop* clientRunLoop); - static PassRefPtr<Connection> createClientConnection(Identifier, Client*, WTF::RunLoop* clientRunLoop); + static Ref<Connection> createServerConnection(Identifier, Client&); + static Ref<Connection> createClientConnection(Identifier, Client&); ~Connection(); - Client* client() const { return m_client; } + Client& client() const { return m_client; } -#if OS(DARWIN) +#if PLATFORM(MAC) && __MAC_OS_X_VERSION_MIN_REQUIRED <= 101000 void setShouldCloseConnectionOnMachExceptions(); #endif @@ -151,7 +158,7 @@ public: typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*); void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback); - void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*); + void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue&, WorkQueueMessageReceiver*); void removeWorkQueueMessageReceiver(StringReference messageReceiverName); bool open(); @@ -160,15 +167,16 @@ public: void postConnectionDidCloseOnConnectionWorkQueue(); - template<typename T> bool send(T&& message, uint64_t destinationID, unsigned messageSendFlags = 0); - template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), unsigned syncSendFlags = 0); - template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout); + template<typename T> bool send(T&& message, uint64_t destinationID, OptionSet<SendOption> sendOptions = { }); + template<typename T> void sendWithReply(T&& message, uint64_t destinationID, FunctionDispatcher& replyDispatcher, Function<void (std::optional<typename CodingType<typename T::Reply>::Type>)>&& replyHandler); + template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, Seconds timeout = Seconds::infinity(), OptionSet<SendSyncOption> sendSyncOptions = { }); + template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions = { }); - std::unique_ptr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID); - bool sendMessage(std::unique_ptr<MessageEncoder>, unsigned messageSendFlags = 0); - std::unique_ptr<MessageDecoder> sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<MessageEncoder>, std::chrono::milliseconds timeout, unsigned syncSendFlags = 0); - std::unique_ptr<MessageDecoder> sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, std::unique_ptr<MessageEncoder>, std::chrono::milliseconds timeout); - bool sendSyncReply(std::unique_ptr<MessageEncoder>); + bool sendMessage(std::unique_ptr<Encoder>, OptionSet<SendOption> sendOptions); + void sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder>, FunctionDispatcher& replyDispatcher, Function<void (std::unique_ptr<Decoder>)>&& replyHandler); + std::unique_ptr<Encoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID); + std::unique_ptr<Decoder> sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder>, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions); + bool sendSyncReply(std::unique_ptr<Encoder>); void wakeUpRunLoop(); @@ -177,45 +185,71 @@ public: bool inSendSync() const { return m_inSendSyncCount; } + Identifier identifier() const; + +#if PLATFORM(COCOA) + bool kill(); + void terminateSoon(double intervalInSeconds); +#endif + + bool isValid() const { return m_isValid; } + +#if HAVE(QOS_CLASSES) + void setShouldBoostMainThreadOnSyncMessage(bool b) { m_shouldBoostMainThreadOnSyncMessage = b; } +#endif + + uint64_t installIncomingSyncMessageCallback(std::function<void ()>); + void uninstallIncomingSyncMessageCallback(uint64_t); + bool hasIncomingSyncMessage(); + + void allowFullySynchronousModeForTesting() { m_fullySynchronousModeIsAllowedForTesting = true; } + + void ignoreTimeoutsForTesting() { m_ignoreTimeoutsForTesting = true; } + private: - Connection(Identifier, bool isServer, Client*, WTF::RunLoop* clientRunLoop); + Connection(Identifier, bool isServer, Client&); void platformInitialize(Identifier); void platformInvalidate(); - bool isValid() const { return m_client; } + std::unique_ptr<Decoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption>); - std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout); - - std::unique_ptr<MessageDecoder> waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags); + std::unique_ptr<Decoder> waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption>); // Called on the connection work queue. - void processIncomingMessage(std::unique_ptr<MessageDecoder>); - void processIncomingSyncReply(std::unique_ptr<MessageDecoder>); + void processIncomingMessage(std::unique_ptr<Decoder>); + void processIncomingSyncReply(std::unique_ptr<Decoder>); - void addWorkQueueMessageReceiverOnConnectionWorkQueue(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*); - void removeWorkQueueMessageReceiverOnConnectionWorkQueue(StringReference messageReceiverName); - void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver*, MessageDecoder*); + void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver&, Decoder&); bool canSendOutgoingMessages() const; bool platformCanSendOutgoingMessages() const; void sendOutgoingMessages(); - bool sendOutgoingMessage(std::unique_ptr<MessageEncoder>); + bool sendOutgoingMessage(std::unique_ptr<Encoder>); void connectionDidClose(); // Called on the listener thread. - void dispatchConnectionDidClose(); void dispatchOneMessage(); - void dispatchMessage(std::unique_ptr<MessageDecoder>); - void dispatchMessage(MessageDecoder&); - void dispatchSyncMessage(MessageDecoder&); + void dispatchMessage(std::unique_ptr<Decoder>); + void dispatchMessage(Decoder&); + void dispatchSyncMessage(Decoder&); void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString); void didFailToSendSyncMessage(); // Can be called on any thread. - void enqueueIncomingMessage(std::unique_ptr<MessageDecoder>); + void enqueueIncomingMessage(std::unique_ptr<Decoder>); + + void willSendSyncMessage(OptionSet<SendSyncOption>); + void didReceiveSyncReply(OptionSet<SendSyncOption>); - Client* m_client; + Seconds timeoutRespectingIgnoreTimeoutsForTesting(Seconds) const; + +#if PLATFORM(COCOA) + bool sendMessage(std::unique_ptr<MachMessage>); +#endif + + Client& m_client; bool m_isServer; + std::atomic<bool> m_isValid { true }; std::atomic<uint64_t> m_syncRequestID; bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage; @@ -223,119 +257,143 @@ private: DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback; bool m_isConnected; - RefPtr<WorkQueue> m_connectionQueue; - WTF::RunLoop* m_clientRunLoop; + Ref<WorkQueue> m_connectionQueue; HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>> m_workQueueMessageReceivers; unsigned m_inSendSyncCount; unsigned m_inDispatchMessageCount; unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; + unsigned m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting { 0 }; + bool m_fullySynchronousModeIsAllowedForTesting { false }; + bool m_ignoreTimeoutsForTesting { false }; bool m_didReceiveInvalidMessage; // Incoming messages. - Mutex m_incomingMessagesLock; - Deque<std::unique_ptr<MessageDecoder>> m_incomingMessages; + Lock m_incomingMessagesMutex; + Deque<std::unique_ptr<Decoder>> m_incomingMessages; // Outgoing messages. - Mutex m_outgoingMessagesLock; - Deque<std::unique_ptr<MessageEncoder>> m_outgoingMessages; + Lock m_outgoingMessagesMutex; + Deque<std::unique_ptr<Encoder>> m_outgoingMessages; - std::condition_variable m_waitForMessageCondition; - std::mutex m_waitForMessageMutex; - HashMap<std::pair<std::pair<StringReference, StringReference>, uint64_t>, std::unique_ptr<MessageDecoder>> m_waitForMessageMap; + Condition m_waitForMessageCondition; + Lock m_waitForMessageMutex; - // Represents a sync request for which we're waiting on a reply. - struct PendingSyncReply { - // The request ID. - uint64_t syncRequestID; + struct ReplyHandler; - // The reply decoder, will be null if there was an error processing the sync - // message on the other side. - std::unique_ptr<MessageDecoder> replyDecoder; + Lock m_replyHandlersLock; + HashMap<uint64_t, ReplyHandler> m_replyHandlers; - // Will be set to true once a reply has been received. - bool didReceiveReply; - - PendingSyncReply() - : syncRequestID(0) - , didReceiveReply(false) - { - } - - explicit PendingSyncReply(uint64_t syncRequestID) - : syncRequestID(syncRequestID) - , didReceiveReply(0) - { - } - }; + struct WaitForMessageState; + WaitForMessageState* m_waitingForMessage; class SyncMessageState; - friend class SyncMessageState; - RefPtr<SyncMessageState> m_syncMessageState; - Mutex m_syncReplyStateMutex; + Lock m_syncReplyStateMutex; bool m_shouldWaitForSyncReplies; + struct PendingSyncReply; Vector<PendingSyncReply> m_pendingSyncReplies; - class SecondaryThreadPendingSyncReply; - typedef HashMap<uint64_t, SecondaryThreadPendingSyncReply*> SecondaryThreadPendingSyncReplyMap; - SecondaryThreadPendingSyncReplyMap m_secondaryThreadPendingSyncReplyMap; + Lock m_incomingSyncMessageCallbackMutex; + HashMap<uint64_t, std::function<void ()>> m_incomingSyncMessageCallbacks; + RefPtr<WorkQueue> m_incomingSyncMessageCallbackQueue; + uint64_t m_nextIncomingSyncMessageCallbackID { 0 }; + +#if HAVE(QOS_CLASSES) + pthread_t m_mainThread { 0 }; + bool m_shouldBoostMainThreadOnSyncMessage { false }; +#endif -#if OS(DARWIN) +#if USE(UNIX_DOMAIN_SOCKETS) + // Called on the connection queue. + void readyReadHandler(); + bool processMessage(); + bool sendOutputMessage(UnixMessage&); + + Vector<uint8_t> m_readBuffer; + Vector<int> m_fileDescriptors; + int m_socketDescriptor; + std::unique_ptr<UnixMessage> m_pendingOutputMessage; +#if PLATFORM(GTK) + GRefPtr<GSocket> m_socket; + GSocketMonitor m_readSocketMonitor; + GSocketMonitor m_writeSocketMonitor; +#endif +#elif OS(DARWIN) // Called on the connection queue. void receiveSourceEventHandler(); - void initializeDeadNameSource(); - void exceptionSourceEventHandler(); + void initializeSendSource(); mach_port_t m_sendPort; - dispatch_source_t m_deadNameSource; + dispatch_source_t m_sendSource; mach_port_t m_receivePort; - dispatch_source_t m_receivePortDataAvailableSource; + dispatch_source_t m_receiveSource; + + std::unique_ptr<MachMessage> m_pendingOutgoingMachMessage; +#if PLATFORM(MAC) && __MAC_OS_X_VERSION_MIN_REQUIRED <= 101000 + void exceptionSourceEventHandler(); // If setShouldCloseConnectionOnMachExceptions has been called, this has // the exception port that exceptions from the other end will be sent on. mach_port_t m_exceptionPort; dispatch_source_t m_exceptionPortDataAvailableSource; +#endif - xpc_connection_t m_xpcConnection; - -#elif USE(UNIX_DOMAIN_SOCKETS) - // Called on the connection queue. - void readyReadHandler(); - bool processMessage(); - - Vector<uint8_t> m_readBuffer; - size_t m_readBufferSize; - Vector<int> m_fileDescriptors; - size_t m_fileDescriptorsSize; - int m_socketDescriptor; + OSObjectPtr<xpc_connection_t> m_xpcConnection; #endif }; -template<typename T> bool Connection::send(T&& message, uint64_t destinationID, unsigned messageSendFlags) +template<typename T> +bool Connection::send(T&& message, uint64_t destinationID, OptionSet<SendOption> sendOptions) { COMPILE_ASSERT(!T::isSync, AsyncMessageExpected); - auto encoder = std::make_unique<MessageEncoder>(T::receiverName(), T::name(), destinationID); + auto encoder = std::make_unique<Encoder>(T::receiverName(), T::name(), destinationID); encoder->encode(message.arguments()); - return sendMessage(std::move(encoder), messageSendFlags); + return sendMessage(WTFMove(encoder), sendOptions); +} + +template<typename T> +void Connection::sendWithReply(T&& message, uint64_t destinationID, FunctionDispatcher& replyDispatcher, Function<void (std::optional<typename CodingType<typename T::Reply>::Type>)>&& replyHandler) +{ + uint64_t requestID = 0; + std::unique_ptr<Encoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, requestID); + + encoder->encode(message.arguments()); + + sendMessageWithReply(requestID, WTFMove(encoder), replyDispatcher, [replyHandler = WTFMove(replyHandler)](std::unique_ptr<Decoder> decoder) { + if (decoder) { + typename CodingType<typename T::Reply>::Type reply; + if (decoder->decode(reply)) { + replyHandler(WTFMove(reply)); + return; + } + } + + replyHandler(std::nullopt); + }); } -template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned syncSendFlags) +template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions) { COMPILE_ASSERT(T::isSync, SyncMessageExpected); uint64_t syncRequestID = 0; - std::unique_ptr<MessageEncoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID); - + std::unique_ptr<Encoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID); + + if (sendSyncOptions.contains(SendSyncOption::UseFullySynchronousModeForTesting)) { + encoder->setFullySynchronousModeForTesting(); + m_fullySynchronousModeIsAllowedForTesting = true; + } + // Encode the rest of the input arguments. encoder->encode(message.arguments()); // Now send the message and wait for a reply. - std::unique_ptr<MessageDecoder> replyDecoder = sendSyncMessage(syncRequestID, std::move(encoder), timeout, syncSendFlags); + std::unique_ptr<Decoder> replyDecoder = sendSyncMessage(syncRequestID, WTFMove(encoder), timeout, sendSyncOptions); if (!replyDecoder) return false; @@ -343,17 +401,15 @@ template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&& return replyDecoder->decode(reply); } -template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout) +template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions) { - std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout); + std::unique_ptr<Decoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForOptions); if (!decoder) return false; ASSERT(decoder->destinationID() == destinationID); - m_client->didReceiveMessage(this, *decoder); + m_client.didReceiveMessage(*this, *decoder); return true; } } // namespace IPC - -#endif // Connection_h |