/* * Copyright (C) 2010-2016 Apple Inc. All rights reserved. * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #pragma once #include "Decoder.h" #include "Encoder.h" #include "HandleMessage.h" #include "MessageReceiver.h" #include #include #include #include #include #include #include #include #include #if OS(DARWIN) && !USE(UNIX_DOMAIN_SOCKETS) #include #include #include #endif #if PLATFORM(GTK) #include "GSocketMonitor.h" #endif namespace IPC { enum class SendOption { // Whether this message should be dispatched when waiting for a sync reply. // This is the default for synchronous messages. DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0, }; enum class SendSyncOption { // Use this to inform that this sync call will suspend this process until the user responds with input. InformPlatformProcessWillSuspend = 1 << 0, UseFullySynchronousModeForTesting = 1 << 1, }; enum class WaitForOption { // Use this to make waitForMessage be interrupted immediately by any incoming sync messages. InterruptWaitingIfSyncMessageArrives = 1 << 0, }; #define MESSAGE_CHECK_BASE(assertion, connection) do \ if (!(assertion)) { \ ASSERT(assertion); \ (connection)->markCurrentlyDispatchedMessageAsInvalid(); \ return; \ } \ while (0) class MachMessage; class UnixMessage; class Connection : public ThreadSafeRefCounted { public: class Client : public MessageReceiver { public: virtual void didClose(Connection&) = 0; virtual void didReceiveInvalidMessage(Connection&, StringReference messageReceiverName, StringReference messageName) = 0; protected: virtual ~Client() { } }; class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted { }; #if USE(UNIX_DOMAIN_SOCKETS) typedef int Identifier; static bool identifierIsNull(Identifier identifier) { return identifier == -1; } struct SocketPair { int client; int server; }; enum ConnectionOptions { SetCloexecOnClient = 1 << 0, SetCloexecOnServer = 1 << 1, }; static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer); #elif OS(DARWIN) struct Identifier { Identifier() : port(MACH_PORT_NULL) { } Identifier(mach_port_t port) : port(port) { } Identifier(mach_port_t port, OSObjectPtr xpcConnection) : port(port) , xpcConnection(WTFMove(xpcConnection)) { } mach_port_t port; OSObjectPtr xpcConnection; }; static bool identifierIsNull(Identifier identifier) { return identifier.port == MACH_PORT_NULL; } xpc_connection_t xpcConnection() const { return m_xpcConnection.get(); } bool getAuditToken(audit_token_t&); pid_t remoteProcessID() const; #endif static Ref createServerConnection(Identifier, Client&); static Ref createClientConnection(Identifier, Client&); ~Connection(); Client& client() const { return m_client; } #if PLATFORM(MAC) && __MAC_OS_X_VERSION_MIN_REQUIRED <= 101000 void setShouldCloseConnectionOnMachExceptions(); #endif void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool); void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure); // The set callback will be called on the connection work queue when the connection is closed, // before didCall is called on the client thread. Must be called before the connection is opened. // In the future we might want a more generic way to handle sync or async messages directly // on the work queue, for example if we want to handle them on some other thread we could avoid // handling the message on the client thread first. typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*); void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback); void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue&, WorkQueueMessageReceiver*); void removeWorkQueueMessageReceiver(StringReference messageReceiverName); bool open(); void invalidate(); void markCurrentlyDispatchedMessageAsInvalid(); void postConnectionDidCloseOnConnectionWorkQueue(); template bool send(T&& message, uint64_t destinationID, OptionSet sendOptions = { }); template void sendWithReply(T&& message, uint64_t destinationID, FunctionDispatcher& replyDispatcher, Function::Type>)>&& replyHandler); template bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, Seconds timeout = Seconds::infinity(), OptionSet sendSyncOptions = { }); template bool waitForAndDispatchImmediately(uint64_t destinationID, Seconds timeout, OptionSet waitForOptions = { }); bool sendMessage(std::unique_ptr, OptionSet sendOptions); void sendMessageWithReply(uint64_t requestID, std::unique_ptr, FunctionDispatcher& replyDispatcher, Function)>&& replyHandler); std::unique_ptr createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID); std::unique_ptr sendSyncMessage(uint64_t syncRequestID, std::unique_ptr, Seconds timeout, OptionSet sendSyncOptions); bool sendSyncReply(std::unique_ptr); void wakeUpRunLoop(); void incrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { ++m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } void decrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { --m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } bool inSendSync() const { return m_inSendSyncCount; } Identifier identifier() const; #if PLATFORM(COCOA) bool kill(); void terminateSoon(double intervalInSeconds); #endif bool isValid() const { return m_isValid; } #if HAVE(QOS_CLASSES) void setShouldBoostMainThreadOnSyncMessage(bool b) { m_shouldBoostMainThreadOnSyncMessage = b; } #endif uint64_t installIncomingSyncMessageCallback(std::function); void uninstallIncomingSyncMessageCallback(uint64_t); bool hasIncomingSyncMessage(); void allowFullySynchronousModeForTesting() { m_fullySynchronousModeIsAllowedForTesting = true; } void ignoreTimeoutsForTesting() { m_ignoreTimeoutsForTesting = true; } private: Connection(Identifier, bool isServer, Client&); void platformInitialize(Identifier); void platformInvalidate(); std::unique_ptr waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet); std::unique_ptr waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet); // Called on the connection work queue. void processIncomingMessage(std::unique_ptr); void processIncomingSyncReply(std::unique_ptr); void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver&, Decoder&); bool canSendOutgoingMessages() const; bool platformCanSendOutgoingMessages() const; void sendOutgoingMessages(); bool sendOutgoingMessage(std::unique_ptr); void connectionDidClose(); // Called on the listener thread. void dispatchOneMessage(); void dispatchMessage(std::unique_ptr); void dispatchMessage(Decoder&); void dispatchSyncMessage(Decoder&); void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString); void didFailToSendSyncMessage(); // Can be called on any thread. void enqueueIncomingMessage(std::unique_ptr); void willSendSyncMessage(OptionSet); void didReceiveSyncReply(OptionSet); Seconds timeoutRespectingIgnoreTimeoutsForTesting(Seconds) const; #if PLATFORM(COCOA) bool sendMessage(std::unique_ptr); #endif Client& m_client; bool m_isServer; std::atomic m_isValid { true }; std::atomic m_syncRequestID; bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage; bool m_shouldExitOnSyncMessageSendFailure; DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback; bool m_isConnected; Ref m_connectionQueue; HashMap, RefPtr>> m_workQueueMessageReceivers; unsigned m_inSendSyncCount; unsigned m_inDispatchMessageCount; unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; unsigned m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting { 0 }; bool m_fullySynchronousModeIsAllowedForTesting { false }; bool m_ignoreTimeoutsForTesting { false }; bool m_didReceiveInvalidMessage; // Incoming messages. Lock m_incomingMessagesMutex; Deque> m_incomingMessages; // Outgoing messages. Lock m_outgoingMessagesMutex; Deque> m_outgoingMessages; Condition m_waitForMessageCondition; Lock m_waitForMessageMutex; struct ReplyHandler; Lock m_replyHandlersLock; HashMap m_replyHandlers; struct WaitForMessageState; WaitForMessageState* m_waitingForMessage; class SyncMessageState; Lock m_syncReplyStateMutex; bool m_shouldWaitForSyncReplies; struct PendingSyncReply; Vector m_pendingSyncReplies; Lock m_incomingSyncMessageCallbackMutex; HashMap> m_incomingSyncMessageCallbacks; RefPtr m_incomingSyncMessageCallbackQueue; uint64_t m_nextIncomingSyncMessageCallbackID { 0 }; #if HAVE(QOS_CLASSES) pthread_t m_mainThread { 0 }; bool m_shouldBoostMainThreadOnSyncMessage { false }; #endif #if USE(UNIX_DOMAIN_SOCKETS) // Called on the connection queue. void readyReadHandler(); bool processMessage(); bool sendOutputMessage(UnixMessage&); Vector m_readBuffer; Vector m_fileDescriptors; int m_socketDescriptor; std::unique_ptr m_pendingOutputMessage; #if PLATFORM(GTK) GRefPtr m_socket; GSocketMonitor m_readSocketMonitor; GSocketMonitor m_writeSocketMonitor; #endif #elif OS(DARWIN) // Called on the connection queue. void receiveSourceEventHandler(); void initializeSendSource(); mach_port_t m_sendPort; dispatch_source_t m_sendSource; mach_port_t m_receivePort; dispatch_source_t m_receiveSource; std::unique_ptr m_pendingOutgoingMachMessage; #if PLATFORM(MAC) && __MAC_OS_X_VERSION_MIN_REQUIRED <= 101000 void exceptionSourceEventHandler(); // If setShouldCloseConnectionOnMachExceptions has been called, this has // the exception port that exceptions from the other end will be sent on. mach_port_t m_exceptionPort; dispatch_source_t m_exceptionPortDataAvailableSource; #endif OSObjectPtr m_xpcConnection; #endif }; template bool Connection::send(T&& message, uint64_t destinationID, OptionSet sendOptions) { COMPILE_ASSERT(!T::isSync, AsyncMessageExpected); auto encoder = std::make_unique(T::receiverName(), T::name(), destinationID); encoder->encode(message.arguments()); return sendMessage(WTFMove(encoder), sendOptions); } template void Connection::sendWithReply(T&& message, uint64_t destinationID, FunctionDispatcher& replyDispatcher, Function::Type>)>&& replyHandler) { uint64_t requestID = 0; std::unique_ptr encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, requestID); encoder->encode(message.arguments()); sendMessageWithReply(requestID, WTFMove(encoder), replyDispatcher, [replyHandler = WTFMove(replyHandler)](std::unique_ptr decoder) { if (decoder) { typename CodingType::Type reply; if (decoder->decode(reply)) { replyHandler(WTFMove(reply)); return; } } replyHandler(std::nullopt); }); } template bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, Seconds timeout, OptionSet sendSyncOptions) { COMPILE_ASSERT(T::isSync, SyncMessageExpected); uint64_t syncRequestID = 0; std::unique_ptr encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID); if (sendSyncOptions.contains(SendSyncOption::UseFullySynchronousModeForTesting)) { encoder->setFullySynchronousModeForTesting(); m_fullySynchronousModeIsAllowedForTesting = true; } // Encode the rest of the input arguments. encoder->encode(message.arguments()); // Now send the message and wait for a reply. std::unique_ptr replyDecoder = sendSyncMessage(syncRequestID, WTFMove(encoder), timeout, sendSyncOptions); if (!replyDecoder) return false; // Decode the reply. return replyDecoder->decode(reply); } template bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, Seconds timeout, OptionSet waitForOptions) { std::unique_ptr decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForOptions); if (!decoder) return false; ASSERT(decoder->destinationID() == destinationID); m_client.didReceiveMessage(*this, *decoder); return true; } } // namespace IPC